Skip to content

Commit

Permalink
Fixed flaky tests under BasicReplicationIT and PauseReplicationIT
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Kumar <[email protected]>
  • Loading branch information
saikaranam-amazon committed Jan 9, 2022
1 parent 3504db4 commit 3dcc229
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
20 changes: 10 additions & 10 deletions src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
var response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)

assertBusy {
assertBusy({
val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}, 60L, TimeUnit.SECONDS)

// Ensure force merge on leader doesn't impact replication
for (i in 2..5) {
Expand All @@ -73,13 +73,13 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)
}
assertBusy {
assertBusy({
for (i in 2..10) {
val getResponse = follower.get(GetRequest(followerIndex, "$i"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}
}, 60L, TimeUnit.SECONDS)

// Force merge on follower however isn't allowed due to WRITE block
Assertions.assertThatThrownBy {
Expand Down Expand Up @@ -131,31 +131,31 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
var response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED)

assertBusy {
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}, 60L, TimeUnit.SECONDS)

// Update document
source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to update leader data").isEqualTo(Result.UPDATED)

assertBusy {
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
},60L, TimeUnit.SECONDS)

// Delete document
val deleteResponse = leaderClient.delete(DeleteRequest(leaderIndexName).id("1"), RequestOptions.DEFAULT)
assertThat(deleteResponse.result).withFailMessage("Failed to delete leader data").isEqualTo(Result.DELETED)

assertBusy {
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isFalse()
}
}, 60L, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,11 @@ import java.util.concurrent.TimeUnit
)
class PauseReplicationIT: MultiClusterRestTestCase() {
private val leaderIndexName = "leader_index"
private val followerIndexName = "paused_index"

fun `test pause replication in following state and empty index`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

val followerIndexName = "pause_index_follow_state"
createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Expand Down Expand Up @@ -108,6 +107,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
| """.trimMargin())
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
val followerIndexName = "pause_index_restore_state"
createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings),
Expand Down Expand Up @@ -135,6 +135,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {

fun `test pause without replication in progress`() {
val followerClient = getClientForCluster(FOLLOWER)
val followerIndexName = "pause_index_no_repl"
//ToDo : Using followerIndex interferes with other test. Is wipeIndicesFromCluster not working ?
var randomIndex = "random"
val createIndexResponse = followerClient.indices().create(CreateIndexRequest(randomIndex),
Expand All @@ -153,6 +154,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
fun `test pause replication and stop replication`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
val followerIndexName = "pause_index_with_stop"
createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Expand Down Expand Up @@ -182,6 +184,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {

fun `test pause replication when leader cluster is unavailable`() {
val followerClient = getClientForCluster(FOLLOWER)
val followerIndexName = "pause_index_leader_down"
try {
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
Expand Down

0 comments on commit 3dcc229

Please sign in to comment.