From e3fd0316f25db3c06ab42aa00c8b6069146edf7b Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Wed, 24 Apr 2024 22:57:26 +0530 Subject: [PATCH] =?UTF-8?q?Adding=20support=20to=20fetch=20changes=20from?= =?UTF-8?q?=20Lucene=20store=20while=20migrating=20from/to=20r=E2=80=A6=20?= =?UTF-8?q?(#1369)=20(#1375)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Shubh Sahu --- build.gradle | 2 + .../replication/ReplicationPlugin.kt | 2 +- .../changes/TransportGetChangesAction.kt | 20 +- .../replication/util/ValidationUtil.kt | 12 +- .../integ/rest/StartReplicationIT.kt | 208 +++++++++++------- 5 files changed, 155 insertions(+), 89 deletions(-) diff --git a/build.gradle b/build.gradle index da5f35bf..4f159067 100644 --- a/build.gradle +++ b/build.gradle @@ -381,6 +381,8 @@ testClusters { testDistribution = "ARCHIVE" } int debugPort = 5005 + //adding it to test migration + systemProperty('opensearch.experimental.feature.remote_store.migration.enabled','true') if (_numNodes > 1) numberOfNodes = _numNodes //numberOfNodes = 3 diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 792ebe27..3790c3c0 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -387,7 +387,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, override fun getCustomTranslogDeletionPolicyFactory(): Optional { // We don't need a retention lease translog deletion policy for remote store enabled clusters as // we fetch the operations directly from lucene in such cases. - return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) { + return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService) == false) { Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier -> ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier) }) diff --git a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt index 392555b5..4e364906 100644 --- a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt @@ -77,8 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus indexMetric.lastFetchTime.set(relativeStartNanos) val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id) - val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService) - if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) { + val isRemoteEnabledOrMigrating = ValidationUtil.isRemoteEnabledOrMigrating(clusterService) + if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) { // There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If // the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller // should catch and start a new poll. @@ -87,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus // At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced // to the translog, which means we can't return those changes. Return to the caller to retry. // TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog - if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) { - assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" } + if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) { + assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)}" } throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...") } } relativeStartNanos = System.nanoTime() // At this point lastSyncedGlobalCheckpoint is at least fromSeqNo - val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo) + val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating), request.toSeqNo) var ops: List = listOf() - var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false + var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteEnabledOrMigrating == false if(fetchFromTranslog) { try { ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo) @@ -136,16 +136,16 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus indexMetric.ops.addAndGet(ops.size.toLong()) ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) } - GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) + GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) } } } - private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long { + private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteEnabledOrMigrating: Boolean): Long { // We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store // enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to // lastKnownGlobalCheckpoint in such cases. - return if (isRemoteStoreEnabled) { + return if (isRemoteEnabledOrMigrating) { indexShard.lastKnownGlobalCheckpoint } else { indexShard.lastSyncedGlobalCheckpoint @@ -173,7 +173,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator { val shardIt = state.routingTable().shardRoutingTable(request.request().shardId) // Random active shards - return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt() + return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService)) shardIt.primaryShardIt() else shardIt.activeInitializingShardsRandomIt() } } \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt index 3aad9665..867c431d 100644 --- a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt +++ b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt @@ -17,18 +17,18 @@ import org.opensearch.Version import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.MetadataCreateIndexService -import org.opensearch.core.common.Strings +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.ValidationException import org.opensearch.common.settings.Settings +import org.opensearch.core.common.Strings import org.opensearch.env.Environment import org.opensearch.index.IndexNotFoundException -import java.io.UnsupportedEncodingException -import org.opensearch.cluster.service.ClusterService import org.opensearch.node.Node import org.opensearch.node.remotestore.RemoteStoreNodeAttribute +import org.opensearch.node.remotestore.RemoteStoreNodeService import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING -import org.opensearch.replication.action.changes.TransportGetChangesAction +import java.io.UnsupportedEncodingException import java.nio.file.Files import java.nio.file.Path import java.util.Locale @@ -161,4 +161,8 @@ object ValidationUtil { return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false } + fun isRemoteEnabledOrMigrating(clusterService: ClusterService): Boolean { + return isRemoteStoreEnabledCluster(clusterService) || + clusterService.clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(RemoteStoreNodeService.CompatibilityMode.MIXED) + } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 5d35c62e..f124f373 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -79,6 +79,10 @@ import java.nio.file.Files import java.util.* import java.util.concurrent.TimeUnit import org.opensearch.bootstrap.BootstrapInfo +import org.opensearch.cluster.service.ClusterService +import org.opensearch.index.mapper.Mapping +import org.opensearch.indices.replication.common.ReplicationType +import org.opensearch.replication.util.ValidationUtil @@ -1026,80 +1030,6 @@ class StartReplicationIT: MultiClusterRestTestCase() { .hasMessageContaining("Primary shards in the Index[source:${leaderIndexName}] are not active") } - fun `test that follower index mapping updates when leader index gets multi-field mapping`() { - val followerClient = getClientForCluster(FOLLOWER) - val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - var putMappingRequest = PutMappingRequest(leaderIndexName) - putMappingRequest.source("{\"properties\":{\"field1\":{\"type\":\"text\"}}}", XContentType.JSON) - leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) - val sourceMap = mapOf("field1" to randomAlphaOfLength(5)) - leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), - waitForRestore = true) - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - Assert.assertEquals( - leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) - .mappings()[leaderIndexName], - followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) - .mappings()[followerIndexName] - ) - putMappingRequest = PutMappingRequest(leaderIndexName) - putMappingRequest.source("{\"properties\":{\"field1\":{\"type\":\"text\",\"fields\":{\"field2\":{\"type\":\"text\",\"analyzer\":\"standard\"},\"field3\":{\"type\":\"text\",\"analyzer\":\"standard\"}}}}}",XContentType.JSON) - leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) - val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) - .mappings()[leaderIndexName] - TimeUnit.MINUTES.sleep(2) - Assert.assertEquals( - leaderMappings, - followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) - .mappings()[followerIndexName] - ) - } - - fun `test that follower index mapping does not update when only new fields are added but not respective docs in leader index`() { - val followerClient = getClientForCluster(FOLLOWER) - val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - var putMappingRequest = PutMappingRequest(leaderIndexName) - putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"text\"}}}", XContentType.JSON) - leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) - val sourceMap = mapOf("name" to randomAlphaOfLength(5)) - leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), - waitForRestore = true) - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - Assert.assertEquals( - leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) - .mappings()[leaderIndexName], - followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) - .mappings()[followerIndexName] - ) - putMappingRequest = PutMappingRequest(leaderIndexName) - putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"text\"},\"age\":{\"type\":\"integer\"}}}",XContentType.JSON) - leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) - val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) - .mappings()[leaderIndexName] - TimeUnit.MINUTES.sleep(2) - Assert.assertNotEquals( - leaderMappings, - followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) - .mappings()[followerIndexName] - ) - } - fun `test that wait_for_active_shards setting is set on leader and not on follower`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) @@ -1255,6 +1185,136 @@ class StartReplicationIT: MultiClusterRestTestCase() { followerClient.stopReplication(followerIndexName) } } + fun `test that follower index mapping updates when leader index gets multi-field mapping`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + var putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"field1\":{\"type\":\"text\"}}}", XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val sourceMap = mapOf("field1" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + Assert.assertEquals( + leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName], + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"field1\":{\"type\":\"text\",\"fields\":{\"field2\":{\"type\":\"text\",\"analyzer\":\"standard\"},\"field3\":{\"type\":\"text\",\"analyzer\":\"standard\"}}}}}",XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName] + TimeUnit.MINUTES.sleep(2) + Assert.assertEquals( + leaderMappings, + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + } + + fun `test that follower index mapping does not update when only new fields are added but not respective docs in leader index`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + var putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"text\"}}}", XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + Assert.assertEquals( + leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName], + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"text\"},\"age\":{\"type\":\"integer\"}}}",XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName] + TimeUnit.MINUTES.sleep(2) + Assert.assertNotEquals( + leaderMappings, + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + } + + fun `test operations are fetched from lucene when leader is in mixed mode`() { + + val leaderClient = getClientForCluster(LEADER) + val followerClient = getClientForCluster(FOLLOWER) + + // create index on leader cluster + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build() + val createIndexResponse = leaderClient.indices().create( + CreateIndexRequest(leaderIndexName).settings(settings), + RequestOptions.DEFAULT + ) + assertThat(createIndexResponse.isAcknowledged).isTrue() + + // Update leader cluster settings to enable mixed mode and set migration direction to remote_store + val leaderClusterUpdateSettingsRequest = Request("PUT", "_cluster/settings") + val entityAsString = """ + { + "persistent": { + "remote_store.compatibility_mode": "mixed", + "migration.direction" : "remote_store" + } + }""".trimMargin() + + leaderClusterUpdateSettingsRequest.entity = NStringEntity(entityAsString,ContentType.APPLICATION_JSON) + val updateSettingResponse = leaderClient.lowLevelClient.performRequest(leaderClusterUpdateSettingsRequest) + assertEquals(HttpStatus.SC_OK.toLong(), updateSettingResponse.statusLine.statusCode.toLong()) + + //create connection and start replication + createConnectionBetweenClusters(FOLLOWER, LEADER) + + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + true + ) + + //Index documents on leader index + val docCount = 50 + for (i in 1..docCount) { + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) + } + + // Verify that all the documents are replicated to follower index and are fetched from lucene + assertBusy({ + val stats = leaderClient.leaderStats() + assertThat(stats.size).isEqualTo(9) + assertThat(stats.getValue("num_replicated_indices").toString()).isEqualTo("1") + assertThat(stats.getValue("operations_read").toString()).isEqualTo(docCount.toString()) + assertThat(stats.getValue("operations_read_lucene").toString()).isEqualTo(docCount.toString()) + assertThat(stats.getValue("operations_read_translog").toString()).isEqualTo("0") + assertThat(stats.containsKey("index_stats")) + }, 60L, TimeUnit.SECONDS) + } private fun excludeAllClusterNodes(clusterName: String) { val transientSettingsRequest = Request("PUT", "_cluster/settings")