diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index 7ccf8c94..b274bdf0 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -113,6 +113,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet( replayRequest.changes.size.toLong() ) + followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint } catch (e: OpenSearchException) { if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass) || TransportActions.isShardNotAvailableException(e) diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 53493457..d4737c61 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -886,16 +886,12 @@ class StartReplicationIT: MultiClusterRestTestCase() { }, 60L, TimeUnit.SECONDS) } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/cross-cluster-replication/issues/176") + fun `test follower stats`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - val leaderIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader" val followerIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower" - val leaderIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader" val followerIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower" -// val followerIndex2 = "follower_index_2" -// val followerIndex3 = "follower_index_3" createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create( CreateIndexRequest(leaderIndexName), @@ -908,12 +904,12 @@ class StartReplicationIT: MultiClusterRestTestCase() { true ) followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName2, followerIndexName2), + StartReplicationRequest("source", leaderIndexName, followerIndexName2), TimeValue.timeValueSeconds(10), true ) followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName3, followerIndexName3), + StartReplicationRequest("source", leaderIndexName, followerIndexName3), TimeValue.timeValueSeconds(10), true ) @@ -923,12 +919,16 @@ class StartReplicationIT: MultiClusterRestTestCase() { leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) } followerClient.pauseReplication(followerIndexName2) - val stats = followerClient.followerStats() + followerClient.stopReplication(followerIndexName3) + var stats = followerClient.followerStats() assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1") assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1") assertThat(stats.getValue("num_failed_indices").toString()).isEqualTo("0") assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1") - assertThat(stats.getValue("operations_written").toString()).isEqualTo("50") + assertBusy({ + stats = followerClient.followerStats() + assertThat(stats.getValue("operations_written").toString()).isEqualTo("50") + }, 60, TimeUnit.SECONDS) assertThat(stats.getValue("operations_read").toString()).isEqualTo("50") assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0") assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0")