From 3e5e2eec1f74f6c691d542753f03523dd1f8f4eb Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 16 Nov 2022 20:06:54 +0530 Subject: [PATCH] Incorporate PR review feedback Signed-off-by: Ashish Singh --- ... LocalStorePeerRecoverySourceHandler.java} | 10 ++++--- .../recovery/PeerRecoverySourceService.java | 3 +- .../recovery/RecoverySourceHandler.java | 2 +- .../RecoverySourceHandlerFactory.java | 8 ++++-- ...RemoteStorePeerRecoverySourceHandler.java} | 13 ++++----- ...lStorePeerRecoverySourceHandlerTests.java} | 28 +++++++++---------- ...eStorePeerRecoverySourceHandlerTests.java} | 2 +- .../index/shard/IndexShardTestCase.java | 2 +- 8 files changed, 35 insertions(+), 33 deletions(-) rename server/src/main/java/org/opensearch/indices/recovery/{DefaultRecoverySourceHandler.java => LocalStorePeerRecoverySourceHandler.java} (97%) rename server/src/main/java/org/opensearch/indices/recovery/{RemoteStoreReplicaRecoverySourceHandler.java => RemoteStorePeerRecoverySourceHandler.java} (88%) rename server/src/test/java/org/opensearch/indices/recovery/{DefaultRecoverySourceHandlerTests.java => LocalStorePeerRecoverySourceHandlerTests.java} (97%) rename server/src/test/java/org/opensearch/indices/recovery/{RemoteStoreReplicaRecoverySourceHandlerTests.java => RemoteStorePeerRecoverySourceHandlerTests.java} (96%) diff --git a/server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java similarity index 97% rename from server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java rename to server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java index b7082b61f54f0..9ffe61208b78c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java @@ -36,12 +36,14 @@ import java.util.function.Consumer; /** - * This handler is used for the peer recovery when there is no remote store available for segments/translogs. TODO - - * Add more details as this is refactored further. + * This handler is used for node-to-node peer recovery when the recovery target is a replica/ or a relocating primary + * shard with translog backed by local store. + * + * @opensearch.internal */ -public class DefaultRecoverySourceHandler extends RecoverySourceHandler { +public class LocalStorePeerRecoverySourceHandler extends RecoverySourceHandler { - public DefaultRecoverySourceHandler( + public LocalStorePeerRecoverySourceHandler( IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool, diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java index c88da80e3c2d0..8bea14a1a1c86 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java @@ -324,7 +324,6 @@ void awaitEmpty() { private final class ShardRecoveryContext { final Map recoveryHandlers = new HashMap<>(); - private final RecoverySourceHandlerFactory recoverySourceHandlerFactory = new RecoverySourceHandlerFactory(); /** * Adds recovery source handler. @@ -379,7 +378,7 @@ private Tuple createRecovery recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime) ); - handler = recoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings); + handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings); return Tuple.tuple(handler, recoveryTarget); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 7acb7cfe72060..03d1066ae5a60 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -120,7 +120,7 @@ public abstract class RecoverySourceHandler { public static final String PEER_RECOVERY_NAME = "peer-recovery"; private final SegmentFileTransferHandler transferHandler; - public RecoverySourceHandler( + RecoverySourceHandler( IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java index 347c2079f7a95..ea13ca18bbfca 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java @@ -12,10 +12,12 @@ /** * Factory that supplies {@link RecoverySourceHandler}. + * + * @opensearch.internal */ public class RecoverySourceHandlerFactory { - public RecoverySourceHandler create( + public static RecoverySourceHandler create( IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request, @@ -23,7 +25,7 @@ public RecoverySourceHandler create( ) { boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false && shard.isRemoteTranslogEnabled(); if (isReplicaRecoveryWithRemoteTranslog) { - return new RemoteStoreReplicaRecoverySourceHandler( + return new RemoteStorePeerRecoverySourceHandler( shard, recoveryTarget, shard.getThreadPool(), @@ -33,7 +35,7 @@ public RecoverySourceHandler create( recoverySettings.getMaxConcurrentOperations() ); } else { - return new DefaultRecoverySourceHandler( + return new LocalStorePeerRecoverySourceHandler( shard, recoveryTarget, shard.getThreadPool(), diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java similarity index 88% rename from server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java rename to server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java index 6477c16dd92e4..ff218ef71e397 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java @@ -26,10 +26,12 @@ /** * This handler is used when peer recovery target is a remote store enabled replica. + * + * @opensearch.internal */ -public class RemoteStoreReplicaRecoverySourceHandler extends RecoverySourceHandler { +public class RemoteStorePeerRecoverySourceHandler extends RecoverySourceHandler { - public RemoteStoreReplicaRecoverySourceHandler( + public RemoteStorePeerRecoverySourceHandler( IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool, @@ -44,11 +46,8 @@ public RemoteStoreReplicaRecoverySourceHandler( @Override protected void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException { // A replica of an index with remote translog does not require the translogs locally and keeps receiving the - // updated segments file on refresh, flushes, and merges. We plan to make the existing replication call to - // just no-op for primary term validation. Hence, there is essentially no writing to the indexWriter as well - // as the translogs locally. In recovery, we will resort to file-based recovery and leave the buffered writes - // to lucene (which are yet to be flushed as segments). Subsequent segment replication will take care of syncing - // the refreshed segment files to the replica. + // updated segments file on refresh, flushes, and merges. In recovery, here, only file-based recovery is performed + // and there is no translog replay done. final StepListener sendFileStep = new StepListener<>(); final StepListener prepareEngineStep = new StepListener<>(); diff --git a/server/src/test/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java similarity index 97% rename from server/src/test/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandlerTests.java rename to server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index 665b744deb8f3..7761f97769440 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -144,9 +144,9 @@ import static org.mockito.Mockito.when; /** - * This covers test cases for {@link RecoverySourceHandler} and {@link DefaultRecoverySourceHandler}. + * This covers test cases for {@link RecoverySourceHandler} and {@link LocalStorePeerRecoverySourceHandler}. */ -public class DefaultRecoverySourceHandlerTests extends OpenSearchTestCase { +public class LocalStorePeerRecoverySourceHandlerTests extends OpenSearchTestCase { private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( "index", Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() @@ -218,7 +218,7 @@ public void writeFileChunk( }); } }; - RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -299,7 +299,7 @@ public void indexTranslogOperations( listener.onResponse(checkpointOnTarget.get()); } }; - RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, @@ -362,7 +362,7 @@ public void indexTranslogOperations( } } }; - RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, @@ -436,7 +436,7 @@ public void indexTranslogOperations( Randomness.shuffle(operations); List skipOperations = randomSubsetOf(operations); Translog.Snapshot snapshot = newTranslogSnapshot(operations, skipOperations); - RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -555,7 +555,7 @@ public void writeFileChunk( failedEngine.set(true); return null; }).when(mockShard).failShard(any(), any()); - RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -630,7 +630,7 @@ public void writeFileChunk( failedEngine.set(true); return null; }).when(mockShard).failShard(any(), any()); - RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -683,7 +683,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE final AtomicBoolean phase1Called = new AtomicBoolean(); final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); final AtomicBoolean phase2Called = new AtomicBoolean(); - final RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, mock(RecoveryTargetHandler.class), threadPool, @@ -795,7 +795,7 @@ public void writeFileChunk( }; final int maxConcurrentChunks = between(1, 8); final int chunkSize = between(1, 32); - final RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, recoveryTarget, threadPool, @@ -868,7 +868,7 @@ public void writeFileChunk( }; final int maxConcurrentChunks = between(1, 4); final int chunkSize = between(1, 16); - final RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( null, new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor), threadPool, @@ -976,7 +976,7 @@ public void cleanFiles( } }; final StartRecoveryRequest startRecoveryRequest = getStartRecoveryRequest(); - final RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, recoveryTarget, threadPool, @@ -1015,7 +1015,7 @@ void createRetentionLease(long startingSeqNo, ActionListener lis public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception { IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); - RecoverySourceHandler handler = new DefaultRecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, new TestRecoveryTargetHandler(), threadPool, @@ -1070,7 +1070,7 @@ private Store newStore(Path path) throws IOException { } private Store newStore(Path path, boolean checkIndex) throws IOException { - BaseDirectoryWrapper baseDirectoryWrapper = DefaultRecoverySourceHandlerTests.newFSDirectory(path); + BaseDirectoryWrapper baseDirectoryWrapper = LocalStorePeerRecoverySourceHandlerTests.newFSDirectory(path); if (checkIndex == false) { baseDirectoryWrapper.setCheckIndexOnClose(false); // don't run checkindex we might corrupt the index in these tests } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java similarity index 96% rename from server/src/test/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandlerTests.java rename to server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index cd07557ef8045..91953d4db3495 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -16,7 +16,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.replication.common.ReplicationType; -public class RemoteStoreReplicaRecoverySourceHandlerTests extends OpenSearchIndexLevelReplicationTestCase { +public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLevelReplicationTestCase { private static final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ea98e0cd66fe9..f874ab44d9d3b 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -875,7 +875,7 @@ protected final void recoverUnstartedReplica( new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); recoverySettings.setChunkSize(new ByteSizeValue(fileChunkSizeInBytes)); - final RecoverySourceHandler recovery = new RecoverySourceHandlerFactory().create( + final RecoverySourceHandler recovery = RecoverySourceHandlerFactory.create( primary, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request,