Skip to content

Commit

Permalink
Fix RareClusterStateIT Cancelling Publication too Early (elastic#51429)
Browse files Browse the repository at this point in the history
Wait for the cluster to have settled down and have the same accepted version on all nodes before
executing and cancelling request so that a slow CS accept on one node doesn't make it fall behind
and then get sent the full CS because of the diff-version mismatch, breaking the mechanics of this test.

Closes elastic#51308
  • Loading branch information
original-brownbear committed Jan 24, 2020
1 parent bf53ca3 commit 37a238c
Showing 1 changed file with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -142,7 +144,13 @@ private <Req extends ActionRequest, Res extends ActionResponse> ActionFuture<Res
// Wait for no publication in progress to not accidentally cancel a publication different from the one triggered by the given
// request.
assertBusy(
() -> assertFalse(((Coordinator) internalCluster().getCurrentMasterNodeInstance(Discovery.class)).publicationInProgress()));
() -> {
assertFalse(((Coordinator) internalCluster().getCurrentMasterNodeInstance(Discovery.class)).publicationInProgress());
assertThat(StreamSupport.stream(
internalCluster().getInstances(Discovery.class).spliterator(), false)
.map(coordinator -> ((Coordinator) coordinator).getLastAcceptedState().version())
.distinct().toArray(), arrayWithSize(1));
});
ActionFuture<Res> future = req.execute();
assertBusy(
() -> assertTrue(((Coordinator)internalCluster().getCurrentMasterNodeInstance(Discovery.class)).cancelCommittedPublication()));
Expand Down Expand Up @@ -268,11 +276,9 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {

// Now make sure the indexing request finishes successfully
disruption.stopDisrupting();
assertBusy(() -> {
assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged());
assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class));
assertEquals(1, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal());
});
assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged());
assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class));
assertEquals(1, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal());
}

public void testDelayedMappingPropagationOnReplica() throws Exception {
Expand Down Expand Up @@ -373,11 +379,9 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {

// Now make sure the indexing request finishes successfully
disruption.stopDisrupting();
assertBusy(() -> {
assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged());
assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class));
assertEquals(2, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); // both shards should have succeeded
});
assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged());
assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class));
assertEquals(2, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); // both shards should have succeeded

assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
}
Expand Down

0 comments on commit 37a238c

Please sign in to comment.