From 7160f1f913223908296e3e4daea0986880ba017e Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 17 Jan 2019 08:46:33 +0100 Subject: [PATCH 1/2] Wait for operations to complete before refresh --- .../elasticsearch/recovery/RelocationIT.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 45f0fce3b8143..f026081af1925 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -552,7 +553,7 @@ public void testRelocateWhileWaitingForRefresh() { assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); } - public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { + public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { logger.info("--> starting [node1] ..."); final String node1 = internalCluster().startNode(); @@ -570,9 +571,11 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { logger.info("--> flush so we have an actual index"); client().admin().indices().prepareFlush().execute().actionGet(); logger.info("--> index more docs so we have something in the translog"); + final List> pendingIndexResponses = new ArrayList<>(); for (int i = 10; i < 20; i++) { - client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setSource("field", "value" + i).execute(); + pendingIndexResponses.add(client().prepareIndex("test", "type", Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i).execute()); } logger.info("--> start another node"); @@ -587,8 +590,9 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { .execute(); logger.info("--> index 100 docs while relocating"); for (int i = 20; i < 120; i++) { - client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setSource("field", "value" + i).execute(); + pendingIndexResponses.add(client().prepareIndex("test", "type", Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i).execute()); } relocationListener.actionGet(); clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) @@ -596,8 +600,11 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> verifying count"); - client().admin().indices().prepareRefresh().execute().actionGet(); - assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); } class RecoveryCorruption implements StubbableTransport.SendRequestBehavior { From 2f64bdddd97efebc5857d248099c0ce04b3720d6 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 17 Jan 2019 09:59:54 +0100 Subject: [PATCH 2/2] move search out of assertBusy --- .../src/test/java/org/elasticsearch/recovery/RelocationIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index f026081af1925..fb455f37d76f3 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -602,9 +602,10 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E logger.info("--> verifying count"); assertBusy(() -> { client().admin().indices().prepareRefresh().execute().actionGet(); - assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); + + assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); } class RecoveryCorruption implements StubbableTransport.SendRequestBehavior {