diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 45f0fce3b8143..fb455f37d76f3 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -552,7 +553,7 @@ public void testRelocateWhileWaitingForRefresh() { assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); } - public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { + public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { logger.info("--> starting [node1] ..."); final String node1 = internalCluster().startNode(); @@ -570,9 +571,11 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { logger.info("--> flush so we have an actual index"); client().admin().indices().prepareFlush().execute().actionGet(); logger.info("--> index more docs so we have something in the translog"); + final List> pendingIndexResponses = new ArrayList<>(); for (int i = 10; i < 20; i++) { - client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setSource("field", "value" + i).execute(); + pendingIndexResponses.add(client().prepareIndex("test", "type", Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i).execute()); } logger.info("--> start another node"); @@ -587,8 +590,9 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { .execute(); logger.info("--> index 100 docs while relocating"); for (int i = 20; i < 120; i++) { - client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setSource("field", "value" + i).execute(); + pendingIndexResponses.add(client().prepareIndex("test", "type", Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i).execute()); } relocationListener.actionGet(); clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) @@ -596,7 +600,11 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> verifying count"); - client().admin().indices().prepareRefresh().execute().actionGet(); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); }