diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java index 22377ea1769c8..8fcc76e018a6c 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java @@ -34,13 +34,10 @@ public void testBulkIndexCreatesMapping() throws Exception { BulkRequestBuilder bulkBuilder = client().prepareBulk(); bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON); bulkBuilder.get(); - assertBusy(new Runnable() { - @Override - public void run() { - GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get(); - assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30")); - assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs")); - } + assertBusy(() -> { + GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get(); + assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30")); + assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs")); }); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java index ba488cecb38f8..29235329d6669 100644 --- a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java @@ -275,12 +275,7 @@ public void testRetryOfAnAlreadyTimedOutRequest() throws Exception { transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); // wait until the timeout was triggered and we actually tried to send for the second time - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(transport.capturedRequests().length, equalTo(1)); - } - }); + assertBusy(() -> assertThat(transport.capturedRequests().length, equalTo(1))); // let it fail the second time too requestId = transport.capturedRequests()[0].requestId; diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 3431c4dd2dfa5..a847d342174ed 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; @@ -37,7 +36,6 @@ import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.test.disruption.NetworkDisruption; -import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDelay; import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; @@ -161,12 +159,9 @@ public void testSimpleMinimumMasterNodes() throws Exception { } internalCluster().stopRandomNonMasterNode(); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true)); - } + assertBusy(() -> { + ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + assertThat(state1.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true)); }); logger.info("--> starting the previous master node again..."); @@ -408,12 +403,7 @@ public void onFailure(String source, Exception e) { latch.await(); assertThat(failure.get(), instanceOf(Discovery.FailedToCommitClusterStateException.class)); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue()); - } - }); + assertBusy(() -> assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue())); partition.stopDisrupting(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java index 203d70df65a11..778394520a5ac 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java @@ -70,12 +70,7 @@ public void testDelayedAllocationNodeLeavesAndComesBack() throws Exception { ensureGreen("test"); indexRandomData(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard())); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)); - } - }); + assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true))); assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); internalCluster().startNode(); // this will use the same data location as the stopped node ensureGreen("test"); @@ -116,12 +111,7 @@ public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception { ensureGreen("test"); indexRandomData(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard())); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)); - } - }); + assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true))); assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(100))).get()); ensureGreen("test"); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 06ea7de325835..6a1309f39da9e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -120,12 +120,7 @@ private void createStaleReplicaScenario() throws Exception { logger.info("--> check that old primary shard does not get promoted to primary again"); // kick reroute and wait for all shard states to be fetched client(master).admin().cluster().prepareReroute().get(); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)); - } - }); + assertBusy(() -> assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0))); // kick reroute a second time and check that all shards are unassigned assertThat(client(master).admin().cluster().prepareReroute().get().getState().getRoutingNodes().unassigned().size(), equalTo(2)); } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java index 7e05448bf91ab..7aeab5c940f18 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -50,7 +51,7 @@ public void testPrimaryFailureIncreasesTerm() throws Exception { logger.info("--> waiting for a yellow index"); // JDK 9 type inference gets confused, so we have to help the // type inference - assertBusy(((Runnable) () -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), + assertBusy(((CheckedRunnable) () -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW)))); final long term0 = shard == 0 ? 2 : 1; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 51ddc0f3fd9b3..93ac2878abc0f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -57,12 +57,9 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { List nodes = internalCluster().startNodes(3); // Wait for all 3 nodes to be up - assertBusy(new Runnable() { - @Override - public void run() { - NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get(); - assertThat(resp.getNodes().size(), equalTo(3)); - } + assertBusy(() -> { + NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get(); + assertThat(resp.getNodes().size(), equalTo(3)); }); // Start with all nodes at 50% usage @@ -86,13 +83,10 @@ public void run() { ensureGreen("test"); // Block until the "fake" cluster info is retrieved at least once - assertBusy(new Runnable() { - @Override - public void run() { - ClusterInfo info = cis.getClusterInfo(); - logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size()); - assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0)); - } + assertBusy(() -> { + ClusterInfo info = cis.getClusterInfo(); + logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size()); + assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0)); }); final List realNodeNames = new ArrayList<>(); @@ -113,21 +107,18 @@ public void run() { // Retrieve the count of shards on each node final Map nodesToShardCount = new HashMap<>(); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterStateResponse resp = client().admin().cluster().prepareState().get(); - Iterator iter = resp.getState().getRoutingNodes().iterator(); - while (iter.hasNext()) { - RoutingNode node = iter.next(); - logger.info("--> node {} has {} shards", - node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - } - assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5)); - assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5)); - assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0)); + assertBusy(() -> { + ClusterStateResponse resp12 = client().admin().cluster().prepareState().get(); + Iterator iter12 = resp12.getState().getRoutingNodes().iterator(); + while (iter12.hasNext()) { + RoutingNode node = iter12.next(); + logger.info("--> node {} has {} shards", + node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + nodesToShardCount.put(node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); } + assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5)); + assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5)); + assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0)); }); // Update the disk usages so one node is now back under the high watermark @@ -138,21 +129,18 @@ public void run() { // Retrieve the count of shards on each node nodesToShardCount.clear(); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterStateResponse resp = client().admin().cluster().prepareState().get(); - Iterator iter = resp.getState().getRoutingNodes().iterator(); - while (iter.hasNext()) { - RoutingNode node = iter.next(); - logger.info("--> node {} has {} shards", - node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); - } - assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3)); - assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3)); - assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3)); + assertBusy(() -> { + ClusterStateResponse resp1 = client().admin().cluster().prepareState().get(); + Iterator iter1 = resp1.getState().getRoutingNodes().iterator(); + while (iter1.hasNext()) { + RoutingNode node = iter1.next(); + logger.info("--> node {} has {} shards", + node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + nodesToShardCount.put(node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); } + assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3)); + assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3)); + assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3)); }); } } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 72db2911fc023..142123bb48349 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -229,12 +229,9 @@ public void run() { assertThat("wrong pool size", pool.getPoolSize(), equalTo(max)); assertThat("wrong active size", pool.getActiveCount(), equalTo(max)); barrier.await(); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat("wrong active count", pool.getActiveCount(), equalTo(0)); - assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max)); - } + assertBusy(() -> { + assertThat("wrong active count", pool.getActiveCount(), equalTo(0)); + assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max)); }); terminate(pool); } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java index 3ed105080b30b..17b43a079dc53 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java @@ -264,12 +264,7 @@ public void run() { // the timeout handler is added post execution (and quickly cancelled). We have allow for this // and use assert busy - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(timer.getQueue().size(), equalTo(0)); - } - }, 5, TimeUnit.SECONDS); + assertBusy(() -> assertThat(timer.getQueue().size(), equalTo(0)), 5, TimeUnit.SECONDS); assertThat(timeoutCalled.get(), equalTo(false)); assertTrue(terminate(executor)); assertTrue(terminate(threadPool)); diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 5e8476fb54535..ff77a512f9007 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -1324,34 +1324,28 @@ private void assertNoMaster(final String node, TimeValue maxWaitTime) throws Exc } private void assertNoMaster(final String node, @Nullable final ClusterBlock expectedBlocks, TimeValue maxWaitTime) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState state = getNodeClusterState(node); - assertNull("node [" + node + "] still has [" + state.nodes().getMasterNode() + "] as master", state.nodes().getMasterNode()); - if (expectedBlocks != null) { - for (ClusterBlockLevel level : expectedBlocks.levels()) { - assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock - (level)); - } + assertBusy(() -> { + ClusterState state = getNodeClusterState(node); + assertNull("node [" + node + "] still has [" + state.nodes().getMasterNode() + "] as master", state.nodes().getMasterNode()); + if (expectedBlocks != null) { + for (ClusterBlockLevel level : expectedBlocks.levels()) { + assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock + (level)); } } }, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS); } private void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState state = getNodeClusterState(node); - String masterNode = null; - if (state.nodes().getMasterNode() != null) { - masterNode = state.nodes().getMasterNode().getName(); - } - logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode()); - assertThat("node [" + node + "] still has [" + masterNode + "] as master", - oldMasterNode, not(equalTo(masterNode))); + assertBusy(() -> { + ClusterState state = getNodeClusterState(node); + String masterNode = null; + if (state.nodes().getMasterNode() != null) { + masterNode = state.nodes().getMasterNode().getName(); } + logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode()); + assertThat("node [" + node + "] still has [" + masterNode + "] as master", + oldMasterNode, not(equalTo(masterNode))); }, 10, TimeUnit.SECONDS); } diff --git a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java index f410ebfc48d83..8bb81f8a976e1 100644 --- a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java +++ b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java @@ -130,24 +130,21 @@ private void assertShardInfo(ReplicationResponse response, int expectedTotal, in } private void ensureActiveShardCopies(final int shardId, final int copyCount) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState state = client().admin().cluster().prepareState().get().getState(); - assertThat(state.routingTable().index("idx"), not(nullValue())); - assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue())); - assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount)); - - ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx") - .setWaitForNoRelocatingShards(true) - .get(); - assertThat(healthResponse.isTimedOut(), equalTo(false)); - - RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx") - .setActiveOnly(true) - .get(); - assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0)); - } + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + assertThat(state.routingTable().index("idx"), not(nullValue())); + assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue())); + assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount)); + + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx") + .setWaitForNoRelocatingShards(true) + .get(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx") + .setActiveOnly(true) + .get(); + assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0)); }); } } diff --git a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java index 73d1d41449043..7c3d4c1dce620 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java +++ b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.index; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.DocWriteResponse; @@ -632,16 +631,13 @@ public void testShadowReplicaNaturalRelocation() throws Exception { final String node3 = internalCluster().startNode(nodeSettings); nodes.add(node3); - assertBusy(new Runnable() { - @Override - public void run() { - client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); - ClusterStateResponse resp = client().admin().cluster().prepareState().get(); - RoutingNodes nodes = resp.getState().getRoutingNodes(); - for (RoutingNode node : nodes) { - logger.info("--> node has {} shards (needs at least 2)", node.numberOfOwningShards()); - assertThat("at least 2 shards on node", node.numberOfOwningShards(), greaterThanOrEqualTo(2)); - } + assertBusy(() -> { + client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + ClusterStateResponse resp = client().admin().cluster().prepareState().get(); + RoutingNodes nodes1 = resp.getState().getRoutingNodes(); + for (RoutingNode node : nodes1) { + logger.info("--> node has {} shards (needs at least 2)", node.numberOfOwningShards()); + assertThat("at least 2 shards on node", node.numberOfOwningShards(), greaterThanOrEqualTo(2)); } }); ensureYellow(IDX); @@ -693,16 +689,13 @@ public void testShadowReplicasUsingFieldData() throws Exception { /** wait until none of the nodes have shards allocated on them */ private void assertNoShardsOn(final List nodeList) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - ClusterStateResponse resp = client().admin().cluster().prepareState().get(); - RoutingNodes nodes = resp.getState().getRoutingNodes(); - for (RoutingNode node : nodes) { - logger.info("--> node {} has {} shards", node.node().getName(), node.numberOfOwningShards()); - if (nodeList.contains(node.node().getName())) { - assertThat("no shards on node", node.numberOfOwningShards(), equalTo(0)); - } + assertBusy(() -> { + ClusterStateResponse resp = client().admin().cluster().prepareState().get(); + RoutingNodes nodes = resp.getState().getRoutingNodes(); + for (RoutingNode node : nodes) { + logger.info("--> node {} has {} shards", node.node().getName(), node.numberOfOwningShards()); + if (nodeList.contains(node.node().getName())) { + assertThat("no shards on node", node.numberOfOwningShards(), equalTo(0)); } } }, 1, TimeUnit.MINUTES); @@ -710,16 +703,13 @@ public void run() { /** wait until the node has the specified number of shards allocated on it */ private void assertShardCountOn(final String nodeName, final int shardCount) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - ClusterStateResponse resp = client().admin().cluster().prepareState().get(); - RoutingNodes nodes = resp.getState().getRoutingNodes(); - for (RoutingNode node : nodes) { - logger.info("--> node {} has {} shards", node.node().getName(), node.numberOfOwningShards()); - if (nodeName.equals(node.node().getName())) { - assertThat(node.numberOfOwningShards(), equalTo(shardCount)); - } + assertBusy(() -> { + ClusterStateResponse resp = client().admin().cluster().prepareState().get(); + RoutingNodes nodes = resp.getState().getRoutingNodes(); + for (RoutingNode node : nodes) { + logger.info("--> node {} has {} shards", node.node().getName(), node.numberOfOwningShards()); + if (nodeName.equals(node.node().getName())) { + assertThat(node.numberOfOwningShards(), equalTo(shardCount)); } } }, 1, TimeUnit.MINUTES); diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index cf2804f1f8ac5..44f8e822c100f 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -406,14 +406,11 @@ protected Cancellable scheduleTask(ThreadPool threadPool) { imc.forceCheck(); // We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool: - assertBusy(new Runnable() { - @Override - public void run() { - try (Engine.Searcher s2 = shard.acquireSearcher("index")) { - // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write: - final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed(); - assertTrue(indexingBufferBytes2 < indexingBufferBytes1); - } + assertBusy(() -> { + try (Engine.Searcher s2 = shard.acquireSearcher("index")) { + // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write: + final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed(); + assertTrue(indexingBufferBytes2 < indexingBufferBytes1); } }); } diff --git a/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java b/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java index 484f6e5db76aa..80001ed16ae21 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java @@ -381,16 +381,13 @@ public void testBucketBreaker() throws Exception { /** Issues a cache clear and waits 30 seconds for the field data breaker to be cleared */ public void clearFieldData() throws Exception { client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet(); - assertBusy(new Runnable() { - @Override - public void run() { - NodesStatsResponse resp = client().admin().cluster().prepareNodesStats() - .clear().setBreaker(true).get(new TimeValue(15, TimeUnit.SECONDS)); - for (NodeStats nStats : resp.getNodes()) { - assertThat("fielddata breaker never reset back to 0", - nStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), - equalTo(0L)); - } + assertBusy(() -> { + NodesStatsResponse resp = client().admin().cluster().prepareNodesStats() + .clear().setBreaker(true).get(new TimeValue(15, TimeUnit.SECONDS)); + for (NodeStats nStats : resp.getNodes()) { + assertThat("fielddata breaker never reset back to 0", + nStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), + equalTo(0L)); } }, 30, TimeUnit.SECONDS); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 66244c174ab42..f5dea3ed84f3e 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -265,17 +265,13 @@ public void testRerouteRecovery() throws Exception { logger.info("--> waiting for recovery to start both on source and target"); final Index index = resolveIndex(INDEX_NAME); - assertBusy(new Runnable() { - @Override - public void run() { - - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA); - assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), - equalTo(1)); - indicesService = internalCluster().getInstance(IndicesService.class, nodeB); - assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), - equalTo(1)); - } + assertBusy(() -> { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA); + assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), + equalTo(1)); + indicesService = internalCluster().getInstance(IndicesService.class, nodeB); + assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), + equalTo(1)); }); logger.info("--> request recoveries"); @@ -314,19 +310,16 @@ public void run() { logger.info("--> checking throttling increases"); final long finalNodeAThrottling = nodeAThrottling; final long finalNodeBThrottling = nodeBThrottling; - assertBusy(new Runnable() { - @Override - public void run() { - NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); - assertThat(statsResponse.getNodes(), hasSize(2)); - for (NodeStats nodeStats : statsResponse.getNodes()) { - final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); - if (nodeStats.getNode().getName().equals(nodeA)) { - assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeAThrottling)); - } - if (nodeStats.getNode().getName().equals(nodeB)) { - assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeBThrottling)); - } + assertBusy(() -> { + NodesStatsResponse statsResponse1 = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); + assertThat(statsResponse1.getNodes(), hasSize(2)); + for (NodeStats nodeStats : statsResponse1.getNodes()) { + final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); + if (nodeStats.getNode().getName().equals(nodeA)) { + assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeAThrottling)); + } + if (nodeStats.getNode().getName().equals(nodeB)) { + assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(finalNodeBThrottling)); } } }); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java index caad0ae81b85e..79ee9553167b5 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java @@ -158,12 +158,7 @@ Timer createObj() { Timer lastRead = streamer.serializeDeserialize(); final long time = lastRead.time(); assertThat(time, lessThanOrEqualTo(timer.time())); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat("timer timer should progress compared to captured one ", time, lessThan(timer.time())); - } - }); + assertBusy(() -> assertThat("timer timer should progress compared to captured one ", time, lessThan(timer.time()))); assertThat("captured time shouldn't change", lastRead.time(), equalTo(time)); if (randomBoolean()) { diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index ec7e9f9619f6e..38440d705080b 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -270,23 +270,20 @@ public void onFailure(Exception e) { } }); // ...and wait for mappings to be available on master - assertBusy(new Runnable() { - @Override - public void run() { - ImmutableOpenMap indexMappings = client().admin().indices().prepareGetMappings("index").get().getMappings().get("index"); - assertNotNull(indexMappings); - MappingMetaData typeMappings = indexMappings.get("type"); - assertNotNull(typeMappings); - Object properties; - try { - properties = typeMappings.getSourceAsMap().get("properties"); - } catch (IOException e) { - throw new AssertionError(e); - } - assertNotNull(properties); - Object fieldMapping = ((Map) properties).get("field"); - assertNotNull(fieldMapping); + assertBusy(() -> { + ImmutableOpenMap indexMappings = client().admin().indices().prepareGetMappings("index").get().getMappings().get("index"); + assertNotNull(indexMappings); + MappingMetaData typeMappings = indexMappings.get("type"); + assertNotNull(typeMappings); + Object properties; + try { + properties = typeMappings.getSourceAsMap().get("properties"); + } catch (IOException e) { + throw new AssertionError(e); } + assertNotNull(properties); + Object fieldMapping = ((Map) properties).get("field"); + assertNotNull(fieldMapping); }); final AtomicReference docIndexResponse = new AtomicReference<>(); @@ -311,17 +308,14 @@ public void onFailure(Exception e) { // Now make sure the indexing request finishes successfully disruption.stopDisrupting(); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class)); - PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get(); - assertTrue(resp.isAcknowledged()); - assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); - IndexResponse docResp = (IndexResponse) docIndexResponse.get(); - assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), - 1, docResp.getShardInfo().getTotal()); - } + assertBusy(() -> { + assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class)); + PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get(); + assertTrue(resp.isAcknowledged()); + assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); + IndexResponse docResp = (IndexResponse) docIndexResponse.get(); + assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), + 1, docResp.getShardInfo().getTotal()); }); } @@ -391,17 +385,14 @@ public void onFailure(Exception e) { }); final Index index = resolveIndex("index"); // Wait for mappings to be available on master - assertBusy(new Runnable() { - @Override - public void run() { - final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master); - final IndexService indexService = indicesService.indexServiceSafe(index); - assertNotNull(indexService); - final MapperService mapperService = indexService.mapperService(); - DocumentMapper mapper = mapperService.documentMapper("type"); - assertNotNull(mapper); - assertNotNull(mapper.mappers().getMapper("field")); - } + assertBusy(() -> { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master); + final IndexService indexService = indicesService.indexServiceSafe(index); + assertNotNull(indexService); + final MapperService mapperService = indexService.mapperService(); + DocumentMapper mapper = mapperService.documentMapper("type"); + assertNotNull(mapper); + assertNotNull(mapper.mappers().getMapper("field")); }); final AtomicReference docIndexResponse = new AtomicReference<>(); @@ -418,12 +409,7 @@ public void onFailure(Exception e) { }); // Wait for document to be indexed on primary - assertBusy(new Runnable() { - @Override - public void run() { - assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists()); - } - }); + assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists())); // The mappings have not been propagated to the replica yet as a consequence the document count not be indexed // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled @@ -434,17 +420,14 @@ public void run() { // Now make sure the indexing request finishes successfully disruption.stopDisrupting(); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class)); - PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get(); - assertTrue(resp.isAcknowledged()); - assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); - IndexResponse docResp = (IndexResponse) docIndexResponse.get(); - assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), - 2, docResp.getShardInfo().getTotal()); // both shards should have succeeded - } + assertBusy(() -> { + assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class)); + PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get(); + assertTrue(resp.isAcknowledged()); + assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); + IndexResponse docResp = (IndexResponse) docIndexResponse.get(); + assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), + 2, docResp.getShardInfo().getTotal()); // both shards should have succeeded }); } diff --git a/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 07272577b45e4..83a4161ac36f1 100644 --- a/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -270,12 +270,7 @@ public void testQueryCache() throws Exception { } indexRandom(true, builders); refresh(); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L)); - } - }); + assertBusy(() -> assertThat(client().admin().indices().prepareStats("idx").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), equalTo(0L))); for (int i = 0; i < 10; i++) { assertThat(client().prepareSearch("idx").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get().getHits().getTotalHits(), equalTo((long) numDocs)); diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index dd71a3100f3ea..f05910cb2f5e3 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -376,12 +376,7 @@ public void testShardActiveElsewhereDoesNotDeleteAnother() throws Exception { // allocation filtering may not have immediate effect // TODO: we should add an easier to do this. It's too much of a song and dance.. Index index = resolveIndex("test"); - assertBusy(new Runnable() { - @Override - public void run() { - assertTrue(internalCluster().getInstance(IndicesService.class, node4).hasIndex(index)); - } - }); + assertBusy(() -> assertTrue(internalCluster().getInstance(IndicesService.class, node4).hasIndex(index))); // wait for 4 active shards - we should have lost one shard assertFalse(client().admin().cluster().prepareHealth().setWaitForActiveShards(4).get().isTimedOut()); diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java index d0dd969d0c775..36d9373b0835c 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java @@ -341,12 +341,9 @@ private void logSearchResponse(int numberOfShards, long numberOfDocs, int iterat } private void refreshAndAssert() throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - RefreshResponse actionGet = client().admin().indices().prepareRefresh().get(); - assertAllSuccessful(actionGet); - } + assertBusy(() -> { + RefreshResponse actionGet = client().admin().indices().prepareRefresh().get(); + assertAllSuccessful(actionGet); }, 5, TimeUnit.MINUTES); } } diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 93eb9b03f0b86..6dc2330ebee55 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -58,7 +58,6 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -386,17 +385,14 @@ public void testCancellationCleansTempFiles() throws Exception { .setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none"))); logger.info("--> wait for all replica shards to be removed, on all nodes"); - assertBusy(new Runnable() { - @Override - public void run() { - for (String node : internalCluster().getNodeNames()) { - if (node.equals(p_node)) { - continue; - } - ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState(); - assertThat(node + " indicates assigned replicas", - state.getRoutingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); + assertBusy(() -> { + for (String node : internalCluster().getNodeNames()) { + if (node.equals(p_node)) { + continue; } + ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState(); + assertThat(node + " indicates assigned replicas", + state.getRoutingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); } }); @@ -405,20 +401,17 @@ public void run() { NodeEnvironment nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, node); for (final Path shardLoc : nodeEnvironment.availableShardPaths(new ShardId(indexName, "_na_", 0))) { if (Files.exists(shardLoc)) { - assertBusy(new Runnable() { - @Override - public void run() { - try { - Files.walkFileTree(shardLoc, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - assertThat("found a temporary recovery file: " + file, file.getFileName().toString(), not(startsWith("recovery."))); - return FileVisitResult.CONTINUE; - } - }); - } catch (IOException e) { - throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e); - } + assertBusy(() -> { + try { + Files.walkFileTree(shardLoc, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + assertThat("found a temporary recovery file: " + file, file.getFileName().toString(), not(startsWith("recovery."))); + return FileVisitResult.CONTINUE; + } + }); + } catch (IOException e) { + throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e); } }); } diff --git a/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java b/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java index 44b01b3a4b974..21e7e51c38b22 100644 --- a/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java +++ b/core/src/test/java/org/elasticsearch/search/child/ParentFieldLoadingIT.java @@ -30,9 +30,9 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -131,25 +131,22 @@ public void testChangingEagerParentFieldLoadingAtRuntime() throws Exception { .get(); assertAcked(putMappingResponse); Index test = resolveIndex("test"); - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState clusterState = internalCluster().clusterService().state(); - ShardRouting shardRouting = clusterState.routingTable().index("test").shard(0).getShards().get(0); - String nodeName = clusterState.getNodes().get(shardRouting.currentNodeId()).getName(); - - boolean verified = false; - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); - IndexService indexService = indicesService.indexService(test); - if (indexService != null) { - MapperService mapperService = indexService.mapperService(); - DocumentMapper documentMapper = mapperService.documentMapper("child"); - if (documentMapper != null) { - verified = documentMapper.parentFieldMapper().fieldType().eagerGlobalOrdinals(); - } + assertBusy(() -> { + ClusterState clusterState = internalCluster().clusterService().state(); + ShardRouting shardRouting = clusterState.routingTable().index("test").shard(0).getShards().get(0); + String nodeName = clusterState.getNodes().get(shardRouting.currentNodeId()).getName(); + + boolean verified = false; + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); + IndexService indexService = indicesService.indexService(test); + if (indexService != null) { + MapperService mapperService = indexService.mapperService(); + DocumentMapper documentMapper = mapperService.documentMapper("child"); + if (documentMapper != null) { + verified = documentMapper.parentFieldMapper().fieldType().eagerGlobalOrdinals(); } - assertTrue(verified); } + assertTrue(verified); }); // Need to add a new doc otherwise the refresh doesn't trigger a new searcher diff --git a/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 60b7c6f8fab7a..132706180ad78 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -189,18 +189,15 @@ public static void unblockNode(final String repository, final String node) { } protected void assertBusyPendingTasks(final String taskPrefix, final int expectedCount) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - PendingClusterTasksResponse tasks = client().admin().cluster().preparePendingClusterTasks().get(); - int count = 0; - for(PendingClusterTask task : tasks) { - if (task.getSource().toString().startsWith(taskPrefix)) { - count++; - } + assertBusy(() -> { + PendingClusterTasksResponse tasks = client().admin().cluster().preparePendingClusterTasks().get(); + int count = 0; + for(PendingClusterTask task : tasks) { + if (task.getSource().toString().startsWith(taskPrefix)) { + count++; } - assertThat(count, greaterThanOrEqualTo(expectedCount)); } + assertThat(count, greaterThanOrEqualTo(expectedCount)); }, 1, TimeUnit.MINUTES); } diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index fee2253665007..df1412e91be12 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -219,19 +219,16 @@ public void testRestoreCustomMetadata() throws Exception { Client client = client(); createIndex("test-idx"); logger.info("--> add custom persistent metadata"); - updateClusterState(new ClusterStateUpdater() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - ClusterState.Builder builder = ClusterState.builder(currentState); - MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); - metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("before_snapshot_s")); - metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("before_snapshot_ns")); - metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("before_snapshot_s_gw")); - metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("before_snapshot_ns_gw")); - metadataBuilder.putCustom(SnapshotableGatewayNoApiMetadata.TYPE, new SnapshotableGatewayNoApiMetadata("before_snapshot_s_gw_noapi")); - builder.metaData(metadataBuilder); - return builder.build(); - } + updateClusterState(currentState -> { + ClusterState.Builder builder = ClusterState.builder(currentState); + MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); + metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("before_snapshot_s")); + metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("before_snapshot_ns")); + metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("before_snapshot_s_gw")); + metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("before_snapshot_ns_gw")); + metadataBuilder.putCustom(SnapshotableGatewayNoApiMetadata.TYPE, new SnapshotableGatewayNoApiMetadata("before_snapshot_s_gw_noapi")); + builder.metaData(metadataBuilder); + return builder.build(); }); logger.info("--> create repository"); @@ -246,27 +243,24 @@ public ClusterState execute(ClusterState currentState) throws Exception { assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute().actionGet().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); logger.info("--> change custom persistent metadata"); - updateClusterState(new ClusterStateUpdater() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - ClusterState.Builder builder = ClusterState.builder(currentState); - MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); - if (randomBoolean()) { - metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("after_snapshot_s")); - } else { - metadataBuilder.removeCustom(SnapshottableMetadata.TYPE); - } - metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("after_snapshot_ns")); - if (randomBoolean()) { - metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("after_snapshot_s_gw")); - } else { - metadataBuilder.removeCustom(SnapshottableGatewayMetadata.TYPE); - } - metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("after_snapshot_ns_gw")); - metadataBuilder.removeCustom(SnapshotableGatewayNoApiMetadata.TYPE); - builder.metaData(metadataBuilder); - return builder.build(); + updateClusterState(currentState -> { + ClusterState.Builder builder = ClusterState.builder(currentState); + MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); + if (randomBoolean()) { + metadataBuilder.putCustom(SnapshottableMetadata.TYPE, new SnapshottableMetadata("after_snapshot_s")); + } else { + metadataBuilder.removeCustom(SnapshottableMetadata.TYPE); } + metadataBuilder.putCustom(NonSnapshottableMetadata.TYPE, new NonSnapshottableMetadata("after_snapshot_ns")); + if (randomBoolean()) { + metadataBuilder.putCustom(SnapshottableGatewayMetadata.TYPE, new SnapshottableGatewayMetadata("after_snapshot_s_gw")); + } else { + metadataBuilder.removeCustom(SnapshottableGatewayMetadata.TYPE); + } + metadataBuilder.putCustom(NonSnapshottableGatewayMetadata.TYPE, new NonSnapshottableGatewayMetadata("after_snapshot_ns_gw")); + metadataBuilder.removeCustom(SnapshotableGatewayNoApiMetadata.TYPE); + builder.metaData(metadataBuilder); + return builder.build(); }); logger.info("--> delete repository"); @@ -521,15 +515,12 @@ public void testRestoreIndexWithMissingShards() throws Exception { client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2") .setIndices("test-idx-all", "test-idx-none", "test-idx-some") .setWaitForCompletion(false).setPartial(true).execute().actionGet(); - assertBusy(new Runnable() { - @Override - public void run() { - SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap-2").get(); - List snapshotStatuses = snapshotsStatusResponse.getSnapshots(); - assertEquals(snapshotStatuses.size(), 1); - logger.trace("current snapshot status [{}]", snapshotStatuses.get(0)); - assertTrue(snapshotStatuses.get(0).getState().completed()); - } + assertBusy(() -> { + SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap-2").get(); + List snapshotStatuses = snapshotsStatusResponse.getSnapshots(); + assertEquals(snapshotStatuses.size(), 1); + logger.trace("current snapshot status [{}]", snapshotStatuses.get(0)); + assertTrue(snapshotStatuses.get(0).getState().completed()); }, 1, TimeUnit.MINUTES); SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap-2").get(); List snapshotStatuses = snapshotsStatusResponse.getSnapshots(); @@ -542,15 +533,12 @@ public void run() { // There is slight delay between snapshot being marked as completed in the cluster state and on the file system // After it was marked as completed in the cluster state - we need to check if it's completed on the file system as well - assertBusy(new Runnable() { - @Override - public void run() { - GetSnapshotsResponse response = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").get(); - assertThat(response.getSnapshots().size(), equalTo(1)); - SnapshotInfo snapshotInfo = response.getSnapshots().get(0); - assertTrue(snapshotInfo.state().completed()); - assertEquals(SnapshotState.PARTIAL, snapshotInfo.state()); - } + assertBusy(() -> { + GetSnapshotsResponse response = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").get(); + assertThat(response.getSnapshots().size(), equalTo(1)); + SnapshotInfo snapshotInfo = response.getSnapshots().get(0); + assertTrue(snapshotInfo.state().completed()); + assertEquals(SnapshotState.PARTIAL, snapshotInfo.state()); }, 1, TimeUnit.MINUTES); } else { logger.info("checking snapshot completion using wait_for_completion flag"); @@ -795,13 +783,10 @@ public void testMasterShutdownDuringSnapshot() throws Exception { logger.info("--> wait until the snapshot is done"); - assertBusy(new Runnable() { - @Override - public void run() { - GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get(); - SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); - assertTrue(snapshotInfo.state().completed()); - } + assertBusy(() -> { + GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get(); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + assertTrue(snapshotInfo.state().completed()); }, 1, TimeUnit.MINUTES); logger.info("--> verify that snapshot was succesful"); diff --git a/modules/transport-netty3/src/test/java/org/elasticsearch/transport/netty3/Netty3ScheduledPingTests.java b/modules/transport-netty3/src/test/java/org/elasticsearch/transport/netty3/Netty3ScheduledPingTests.java index b00405f7cad89..02d65bf8fd696 100644 --- a/modules/transport-netty3/src/test/java/org/elasticsearch/transport/netty3/Netty3ScheduledPingTests.java +++ b/modules/transport-netty3/src/test/java/org/elasticsearch/transport/netty3/Netty3ScheduledPingTests.java @@ -108,12 +108,9 @@ public void testScheduledPing() throws Exception { serviceB.connectToNode(nodeA); } - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L)); - assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L)); - } + assertBusy(() -> { + assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L)); + assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L)); }); assertThat(nettyA.getPing().getFailedPings(), equalTo(0L)); assertThat(nettyB.getPing().getFailedPings(), equalTo(0L)); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java index 3657074e778c3..df60f84246dda 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java @@ -99,12 +99,9 @@ public void testScheduledPing() throws Exception { serviceB.connectToNode(nodeA); } - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L)); - assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L)); - } + assertBusy(() -> { + assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L)); + assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L)); }); assertThat(nettyA.getPing().getFailedPings(), equalTo(0L)); assertThat(nettyB.getPing().getFailedPings(), equalTo(0L)); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 2566901741bd3..c8b5f16979ee5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -30,7 +30,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; import com.carrotsearch.randomizedtesting.rules.TestRuleAdapter; - import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,6 +49,7 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; @@ -678,14 +678,14 @@ public static T randomValueOtherThanMany(Predicate input, Supplier ran /** * Runs the code block for 10 seconds waiting for no assertion to trip. */ - public static void assertBusy(Runnable codeBlock) throws Exception { + public static void assertBusy(CheckedRunnable codeBlock) throws Exception { assertBusy(codeBlock, 10, TimeUnit.SECONDS); } /** * Runs the code block for the provided interval, waiting for no assertions to trip. */ - public static void assertBusy(Runnable codeBlock, long maxWaitTime, TimeUnit unit) throws Exception { + public static void assertBusy(CheckedRunnable codeBlock, long maxWaitTime, TimeUnit unit) throws Exception { long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit); long iterations = Math.max(Math.round(Math.log10(maxTimeInMillis) / Math.log10(2)), 1); long timeInMillis = 1; diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 78bda6efaa180..d1ee0f224280a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -2030,12 +2030,9 @@ public void ensureEstimatedStats() { // in an assertBusy loop, so it will try for 10 seconds and // fail if it never reached 0 try { - assertBusy(new Runnable() { - @Override - public void run() { - CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.REQUEST); - assertThat("Request breaker not reset to 0 on node: " + name, reqBreaker.getUsed(), equalTo(0L)); - } + assertBusy(() -> { + CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.REQUEST); + assertThat("Request breaker not reset to 0 on node: " + name, reqBreaker.getUsed(), equalTo(0L)); }); } catch (Exception e) { fail("Exception during check for request breaker reset to 0: " + e);