From 1af6b1ccd7bae65fe2b64ce9ed6efff14dd548b9 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Thu, 6 Jun 2024 11:38:25 +0530 Subject: [PATCH] Revert "Update checkpoint from remote nodes replicas" This reverts commit a4f00d208a537e1af6e1a5d220bc66e3e0c604d3. --- .../MigrationBaseTestCase.java | 3 +- .../RemotePrimaryRelocationIT.java | 40 +++++++--- .../RemoteReplicaRecoveryIT.java | 79 +++++++++++++------ .../opensearch/index/shard/IndexShard.java | 1 + .../SegmentReplicationTargetService.java | 2 +- .../replication/common/ReplicationTarget.java | 4 +- 6 files changed, 88 insertions(+), 41 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index b65f6f056aae6..0493bcf800c97 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -186,11 +186,10 @@ private Thread getIndexingThread() { indexSingleDoc(indexName); long currentDocCount = indexedDocs.incrementAndGet(); if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) { + logger.info("--> [iteration {}] flushing index", currentDocCount); if (rarely()) { - logger.info("--> [iteration {}] flushing index", currentDocCount); client().admin().indices().prepareFlush(indexName).get(); } else { - logger.info("--> [iteration {}] refreshing index", currentDocCount); client().admin().indices().prepareRefresh(indexName).get(); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index cea653c0ead4b..293691ace2edd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -8,11 +8,14 @@ package org.opensearch.remotemigration; +import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.client.Client; import org.opensearch.client.Requests; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; @@ -63,8 +66,8 @@ public void testRemotePrimaryRelocation() throws Exception { AtomicInteger numAutoGenDocs = new AtomicInteger(); final AtomicBoolean finished = new AtomicBoolean(false); - AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); - asyncIndexingService.startIndexing(); + Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); + refresh("test"); // add remote node in mixed mode cluster @@ -138,19 +141,17 @@ public void testRemotePrimaryRelocation() throws Exception { logger.info("--> relocation from remote to remote complete"); finished.set(true); - asyncIndexingService.stopIndexing(); + indexingThread.join(); refresh("test"); - OpenSearchAssertions.assertHitCount( - client().prepareSearch("test").setTrackTotalHits(true).get(), - asyncIndexingService.getIndexedDocs() - ); + OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); OpenSearchAssertions.assertHitCount( client().prepareSearch("test") .setTrackTotalHits(true)// extra paranoia ;) .setQuery(QueryBuilders.termQuery("auto", true)) .get(), - asyncIndexingService.getIndexedDocs() + numAutoGenDocs.get() ); + } public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { @@ -164,8 +165,9 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); ensureGreen("test"); - AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); - asyncIndexingService.startIndexing(); + AtomicInteger numAutoGenDocs = new AtomicInteger(); + final AtomicBoolean finished = new AtomicBoolean(false); + Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); refresh("test"); @@ -207,11 +209,27 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { assertEquals(actionGet.getRelocatingShards(), 0); assertEquals(docRepNode, primaryNodeName("test")); - asyncIndexingService.stopIndexing(); + finished.set(true); + indexingThread.join(); client().admin() .cluster() .prepareUpdateSettings() .setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)) .get(); } + + private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { + Thread indexingThread = new Thread(() -> { + while (finished.get() == false && numAutoGenDocs.get() < 10_000) { + IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); + assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); + DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); + client().prepareIndex("test").setSource("auto", true).get(); + numAutoGenDocs.incrementAndGet(); + } + }); + indexingThread.start(); + return indexingThread; + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java index 7270341202990..196ecb991bbc0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java @@ -8,27 +8,32 @@ package org.opensearch.remotemigration; +import com.carrotsearch.randomizedtesting.generators.RandomNumbers; + +import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) + public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase { protected int maximumNumberOfShards() { @@ -58,8 +63,10 @@ public void testReplicaRecovery() throws Exception { client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); String replicaNode = internalCluster().startNode(); ensureGreen("test"); - AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); - asyncIndexingService.startIndexing(); + + AtomicInteger numAutoGenDocs = new AtomicInteger(); + final AtomicBoolean finished = new AtomicBoolean(false); + Thread indexingThread = getThread(finished, numAutoGenDocs); refresh("test"); @@ -71,10 +78,12 @@ public void testReplicaRecovery() throws Exception { updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - internalCluster().startNode(); + String remoteNode2 = internalCluster().startNode(); internalCluster().validateClusterFormed(); // identify the primary + + Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); logger.info("--> relocating primary from {} to {} ", primaryNode, remoteNode); client().admin() .cluster() @@ -93,6 +102,7 @@ public void testReplicaRecovery() throws Exception { assertEquals(0, clusterHealthResponse.getRelocatingShards()); logger.info("--> relocation of primary from docrep to remote complete"); + Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); logger.info("--> getting up the new replicas now to doc rep node as well as remote node "); // Increase replica count to 3 @@ -119,33 +129,52 @@ public void testReplicaRecovery() throws Exception { logger.info("--> replica is up now on another docrep now as well as remote node"); assertEquals(0, clusterHealthResponse.getRelocatingShards()); - asyncIndexingService.stopIndexing(); - refresh("test"); - // segrep lag should be zero - assertBusy(() -> { - SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() - .indices() - .prepareSegmentReplicationStats("test") - .setDetailed(true) - .execute() - .actionGet(); - SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get("test").get(0); - assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1); - perGroupStats.getReplicaStats().stream().forEach(e -> assertEquals(e.getCurrentReplicationLagMillis(), 0)); - }, 20, TimeUnit.SECONDS); + Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); - OpenSearchAssertions.assertHitCount( - client().prepareSearch("test").setTrackTotalHits(true).get(), - asyncIndexingService.getIndexedDocs() - ); + // Stop replicas on docrep now. + // ToDo : Remove once we have dual replication enabled + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest("test").settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.exclude._name", primaryNode + "," + replicaNode) + .build() + ) + ) + .get(); + + finished.set(true); + indexingThread.join(); + refresh("test"); + OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); OpenSearchAssertions.assertHitCount( client().prepareSearch("test") .setTrackTotalHits(true)// extra paranoia ;) .setQuery(QueryBuilders.termQuery("auto", true)) + // .setPreference("_prefer_nodes:" + (remoteNode+ "," + remoteNode2)) .get(), - asyncIndexingService.getIndexedDocs() + numAutoGenDocs.get() ); } + + private Thread getThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { + Thread indexingThread = new Thread(() -> { + while (finished.get() == false && numAutoGenDocs.get() < 100) { + IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); + assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); + DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); + client().prepareIndex("test").setSource("auto", true).get(); + numAutoGenDocs.incrementAndGet(); + logger.info("Indexed {} docs here", numAutoGenDocs.get()); + } + }); + indexingThread.start(); + return indexingThread; + } + } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 49cb710c915fc..3517579856d43 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -523,6 +523,7 @@ public boolean shouldSeedRemoteStore() { public Function isShardOnRemoteEnabledNode = nodeId -> { DiscoveryNode node = discoveryNodes.get(nodeId); if (node != null) { + logger.trace("Node {} has remote_enabled as {}", nodeId, node.isRemoteStoreNode()); return node.isRemoteStoreNode(); } return false; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index f6ed113019897..fbd7ab7cea346 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -384,7 +384,7 @@ private void logReplicationFailure(SegmentReplicationState state, ReplicationFai protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) { // Update replication checkpoint on source via transport call only supported for remote store integration. For node- // node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call - if (replicaShard.indexSettings().isAssignedOnRemoteNode() == false) { + if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) { return; } ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard(); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 76401eaabbf39..aac59df4f6573 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -91,7 +91,7 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); store.incRef(); - if (indexShard.indexSettings().isAssignedOnRemoteNode()) { + if (indexShard.indexSettings().isRemoteStoreEnabled()) { indexShard.remoteStore().incRef(); } } @@ -284,7 +284,7 @@ protected void closeInternal() { try { store.decRef(); } finally { - if (indexShard.indexSettings().isAssignedOnRemoteNode()) { + if (indexShard.indexSettings().isRemoteStoreEnabled()) { indexShard.remoteStore().decRef(); } }