Skip to content

Commit

Permalink
adding missing checkpoint and correcting follower stats test case (#1064
Browse files Browse the repository at this point in the history
)

Signed-off-by: sricharanvuppu <[email protected]>
  • Loading branch information
sricharanvuppu authored Jul 12, 2023
1 parent 96299ef commit 324b103
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(
replayRequest.changes.size.toLong()
)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
} catch (e: OpenSearchException) {
if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass)
|| TransportActions.isShardNotAvailableException(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,16 +886,12 @@ class StartReplicationIT: MultiClusterRestTestCase() {
}, 60L, TimeUnit.SECONDS)
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/cross-cluster-replication/issues/176")

fun `test follower stats`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
val leaderIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader"
val followerIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower"
val leaderIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader"
val followerIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower"
// val followerIndex2 = "follower_index_2"
// val followerIndex3 = "follower_index_3"
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(
CreateIndexRequest(leaderIndexName),
Expand All @@ -908,12 +904,12 @@ class StartReplicationIT: MultiClusterRestTestCase() {
true
)
followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName2, followerIndexName2),
StartReplicationRequest("source", leaderIndexName, followerIndexName2),
TimeValue.timeValueSeconds(10),
true
)
followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName3, followerIndexName3),
StartReplicationRequest("source", leaderIndexName, followerIndexName3),
TimeValue.timeValueSeconds(10),
true
)
Expand All @@ -923,12 +919,16 @@ class StartReplicationIT: MultiClusterRestTestCase() {
leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT)
}
followerClient.pauseReplication(followerIndexName2)
val stats = followerClient.followerStats()
followerClient.stopReplication(followerIndexName3)
var stats = followerClient.followerStats()
assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("num_failed_indices").toString()).isEqualTo("0")
assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1")
assertThat(stats.getValue("operations_written").toString()).isEqualTo("50")
assertBusy({
stats = followerClient.followerStats()
assertThat(stats.getValue("operations_written").toString()).isEqualTo("50")
}, 60, TimeUnit.SECONDS)
assertThat(stats.getValue("operations_read").toString()).isEqualTo("50")
assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0")
assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0")
Expand Down

0 comments on commit 324b103

Please sign in to comment.