From 3dcc2297f06ee51f75c707a09184db48b2bf6eaa Mon Sep 17 00:00:00 2001 From: Sai Kumar Date: Sun, 9 Jan 2022 21:53:44 +0530 Subject: [PATCH] Fixed flaky tests under BasicReplicationIT and PauseReplicationIT Signed-off-by: Sai Kumar --- .../replication/BasicReplicationIT.kt | 20 +++++++++---------- .../integ/rest/PauseReplicationIT.kt | 7 +++++-- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt index a3b9e7f09..7d426ace7 100644 --- a/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt @@ -57,11 +57,11 @@ class BasicReplicationIT : MultiClusterRestTestCase() { var response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT) assertThat(response.result).isEqualTo(Result.CREATED) - assertBusy { + assertBusy({ val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isTrue() assertThat(getResponse.sourceAsMap).isEqualTo(source) - } + }, 60L, TimeUnit.SECONDS) // Ensure force merge on leader doesn't impact replication for (i in 2..5) { @@ -73,13 +73,13 @@ class BasicReplicationIT : MultiClusterRestTestCase() { response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT) assertThat(response.result).isEqualTo(Result.CREATED) } - assertBusy { + assertBusy({ for (i in 2..10) { val getResponse = follower.get(GetRequest(followerIndex, "$i"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isTrue() assertThat(getResponse.sourceAsMap).isEqualTo(source) } - } + }, 60L, TimeUnit.SECONDS) // Force merge on follower however isn't allowed due to WRITE block Assertions.assertThatThrownBy { @@ -131,31 +131,31 @@ class BasicReplicationIT : MultiClusterRestTestCase() { var response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT) assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED) - assertBusy { + assertBusy({ val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isTrue() assertThat(getResponse.sourceAsMap).isEqualTo(source) - } + }, 60L, TimeUnit.SECONDS) // Update document source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString()) response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT) assertThat(response.result).withFailMessage("Failed to update leader data").isEqualTo(Result.UPDATED) - assertBusy { + assertBusy({ val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isTrue() assertThat(getResponse.sourceAsMap).isEqualTo(source) - } + },60L, TimeUnit.SECONDS) // Delete document val deleteResponse = leaderClient.delete(DeleteRequest(leaderIndexName).id("1"), RequestOptions.DEFAULT) assertThat(deleteResponse.result).withFailMessage("Failed to delete leader data").isEqualTo(Result.DELETED) - assertBusy { + assertBusy({ val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isFalse() - } + }, 60L, TimeUnit.SECONDS) } finally { followerClient.stopReplication(followerIndexName) } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt index 568e08f12..dbb6bcb88 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt @@ -46,12 +46,11 @@ import java.util.concurrent.TimeUnit ) class PauseReplicationIT: MultiClusterRestTestCase() { private val leaderIndexName = "leader_index" - private val followerIndexName = "paused_index" fun `test pause replication in following state and empty index`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - + val followerIndexName = "pause_index_follow_state" createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) @@ -108,6 +107,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() { | """.trimMargin()) val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) + val followerIndexName = "pause_index_restore_state" createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), @@ -135,6 +135,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() { fun `test pause without replication in progress`() { val followerClient = getClientForCluster(FOLLOWER) + val followerIndexName = "pause_index_no_repl" //ToDo : Using followerIndex interferes with other test. Is wipeIndicesFromCluster not working ? var randomIndex = "random" val createIndexResponse = followerClient.indices().create(CreateIndexRequest(randomIndex), @@ -153,6 +154,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() { fun `test pause replication and stop replication`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) + val followerIndexName = "pause_index_with_stop" createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) @@ -182,6 +184,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() { fun `test pause replication when leader cluster is unavailable`() { val followerClient = getClientForCluster(FOLLOWER) + val followerIndexName = "pause_index_leader_down" try { val leaderClient = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLLOWER, LEADER)