Skip to content

Commit

Permalink
[Backport] Remove dependency on jcenter and fixed flaky tests (#292)
Browse files Browse the repository at this point in the history
* [Backport] Remove dependency on jcenter

Signed-off-by: Sai Kumar <[email protected]>

* [Backport] Fixed flaky tests under BasicReplicationIT and PauseReplicationIT (#282)

Signed-off-by: Sai Kumar <[email protected]>

* Corrected build workflow to use release version of OpenSearch-1.1.0

Signed-off-by: Sai Kumar <[email protected]>

* Fix flaky test under StartReplicationIT and ResumeReplicationIT

Signed-off-by: Sai Kumar <[email protected]>
  • Loading branch information
saikaranam-amazon authored Jan 13, 2022
1 parent 53ffede commit 521223e
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 48 deletions.
23 changes: 1 addition & 22 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,12 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 14
# dependencies: OpenSearch
- name: Checkout OpenSearch
uses: actions/checkout@v2
with:
repository: 'opensearch-project/OpenSearch'
path: OpenSearch
ref: '1.1'
- name: Build OpenSearch
working-directory: ./OpenSearch
run: |
./gradlew publishToMavenLocal -Dbuild.snapshot=true
# dependencies: common-utils
- name: Checkout common-utils
uses: actions/checkout@v2
with:
ref: 'main'
repository: 'opensearch-project/common-utils'
path: common-utils
- name: Build common-utils
working-directory: ./common-utils
run: ./gradlew publishToMavenLocal -Dbuild.snapshot=true -Dopensearch.version=1.1.0-SNAPSHOT
# This step uses the checkout Github action: https://github.com/actions/checkout
- name: Checkout Branch
uses: actions/checkout@v2
- name: Build and run Replication tests
run: |
./gradlew clean release -Dbuild.snapshot=true -Dopensearch.version=1.1.0-SNAPSHOT
./gradlew clean release -Dbuild.snapshot=true -Dopensearch.version=1.1.0
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ repositories {
mavenLocal()
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
jcenter()
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
}

compileKotlin {
Expand Down
20 changes: 10 additions & 10 deletions src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
var response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)

assertBusy {
assertBusy({
val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}, 60L, TimeUnit.SECONDS)

// Ensure force merge on leader doesn't impact replication
for (i in 2..5) {
Expand All @@ -73,13 +73,13 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)
}
assertBusy {
assertBusy({
for (i in 2..10) {
val getResponse = follower.get(GetRequest(followerIndex, "$i"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}
}, 60L, TimeUnit.SECONDS)

// Force merge on follower however isn't allowed due to WRITE block
Assertions.assertThatThrownBy {
Expand Down Expand Up @@ -131,31 +131,31 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
var response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED)

assertBusy {
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}, 60L, TimeUnit.SECONDS)

// Update document
source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to update leader data").isEqualTo(Result.UPDATED)

assertBusy {
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
},60L, TimeUnit.SECONDS)

// Delete document
val deleteResponse = leaderClient.delete(DeleteRequest(leaderIndexName).id("1"), RequestOptions.DEFAULT)
assertThat(deleteResponse.result).withFailMessage("Failed to delete leader data").isEqualTo(Result.DELETED)

assertBusy {
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isFalse()
}
}, 60L, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.replication.resumeReplication
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.opensearch.replication.updateReplication
import org.opensearch.replication.getShardReplicationTasks
import org.apache.http.util.EntityUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
Expand All @@ -46,12 +47,11 @@ import java.util.concurrent.TimeUnit
)
class PauseReplicationIT: MultiClusterRestTestCase() {
private val leaderIndexName = "leader_index"
private val followerIndexName = "paused_index"

fun `test pause replication in following state and empty index`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

val followerIndexName = "pause_index_follow_state"
createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Expand Down Expand Up @@ -108,6 +108,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
| """.trimMargin())
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
val followerIndexName = "pause_index_restore_state"
createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings),
Expand All @@ -128,13 +129,20 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
followerClient.pauseReplication(followerIndexName)
}.isInstanceOf(ResponseException::class.java)
.hasMessageContaining("Index is in restore phase currently for index: ${followerIndexName}")
// wait for the shard tasks to be up as the replication block is added before adding shard replication tasks
// During intermittent test failures, stop replication under finally block executes before this without removing
// replication block (even though next call to _stop replication API can succeed in removing this block).
assertBusy({
assertTrue(followerClient.getShardReplicationTasks(followerIndexName).isNotEmpty())
}, 30L, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
}

fun `test pause without replication in progress`() {
val followerClient = getClientForCluster(FOLLOWER)
val followerIndexName = "pause_index_no_repl"
//ToDo : Using followerIndex interferes with other test. Is wipeIndicesFromCluster not working ?
var randomIndex = "random"
val createIndexResponse = followerClient.indices().create(CreateIndexRequest(randomIndex),
Expand All @@ -153,6 +161,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
fun `test pause replication and stop replication`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
val followerIndexName = "pause_index_with_stop"
createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Expand Down Expand Up @@ -182,6 +191,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {

fun `test pause replication when leader cluster is unavailable`() {
val followerClient = getClientForCluster(FOLLOWER)
val followerIndexName = "pause_index_leader_down"
try {
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class ResumeReplicationIT: MultiClusterRestTestCase() {
followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT)
.mappings()[followerIndexName]
)
}, 5, TimeUnit.SECONDS)
}, 60L, TimeUnit.SECONDS)

} finally {
followerClient.stopReplication(followerIndexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,18 +1066,19 @@ class StartReplicationIT: MultiClusterRestTestCase() {
followerClient.pauseReplication(followerIndex2)
followerClient.stopReplication(followerIndex3)


val stats = followerClient.followerStats()
assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("num_failed_indices").toString()).isEqualTo("0")
assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1")
assertThat(stats.getValue("operations_written").toString()).isEqualTo("50")
assertThat(stats.getValue("operations_read").toString()).isEqualTo("50")
assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0")
assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0")
assertThat(stats.containsKey("index_stats"))
assertThat(stats.size).isEqualTo(16)
assertBusy({
val stats = followerClient.followerStats()
assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("num_failed_indices").toString()).isEqualTo("0")
assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1")
assertThat(stats.getValue("operations_written").toString()).isEqualTo("50")
assertThat(stats.getValue("operations_read").toString()).isEqualTo("50")
assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0")
assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0")
assertThat(stats.containsKey("index_stats"))
assertThat(stats.size).isEqualTo(16)
}, 60L, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
followerClient.stopReplication(followerIndex2)
Expand Down

0 comments on commit 521223e

Please sign in to comment.