diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index ea1604c16190b..0da1687f20769 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -902,7 +902,7 @@ private boolean invariant() { if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { - if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { + if (checkpoints.get(shardRouting.allocationId().getId()).tracked && !indexSettings().isRemoteTranslogStoreEnabled()) { assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( diff --git a/server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java index f940949d56aeb..b7082b61f54f0 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/DefaultRecoverySourceHandler.java @@ -161,7 +161,7 @@ && isTargetSameHistory() deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(this + "[phase1]"); - phase1(wrappedSafeCommit.get(), startingSeqNo, () -> estimateNumOps, sendFileStep); + phase1(wrappedSafeCommit.get(), startingSeqNo, () -> estimateNumOps, sendFileStep, false); }, onFailure); } catch (final Exception e) { diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 4bbd17bfb6848..7acb7cfe72060 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -346,7 +346,13 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { + void phase1( + IndexCommit snapshot, + long startingSeqNo, + IntSupplier translogOps, + ActionListener listener, + boolean skipCreateRetentionLeaseStep + ) { cancellableThreads.checkForCancel(); final Store store = shard.store(); try { @@ -444,7 +450,12 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A listener::onFailure ); - sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); + // When doing peer recovery of remote store enabled replica, retention leases are not required. + if (skipCreateRetentionLeaseStep) { + sendFilesStep.whenComplete(r -> createRetentionLeaseStep.onResponse(null), listener::onFailure); + } else { + sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); + } createRetentionLeaseStep.whenComplete(retentionLease -> { final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java index 88a84db869ad0..6477c16dd92e4 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java @@ -74,7 +74,7 @@ protected void innerRecoveryToTarget(ActionListener listener, onSendFileStepComplete(sendFileStep, wrappedSafeCommit, releaseStore); assert Transports.assertNotTransportThread(this + "[phase1]"); - phase1(wrappedSafeCommit.get(), startingSeqNo, () -> 0, sendFileStep); + phase1(wrappedSafeCommit.get(), startingSeqNo, () -> 0, sendFileStep, true); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e); } diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index 5d317693e02df..f57cfbd9c6d07 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; @@ -146,20 +147,23 @@ public void testStartSequenceForReplicaRecovery() throws Exception { null ); shards.addReplica(newReplicaShard); + AtomicBoolean assertDone = new AtomicBoolean(false); shards.recoverReplica(newReplicaShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) { @Override public IndexShard indexShard() { IndexShard idxShard = super.indexShard(); - // verify the starting sequence number while recovering a failed shard which has a valid last commit - long startingSeqNo = -1; - try { - startingSeqNo = Long.parseLong( - idxShard.store().readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO) - ); - } catch (IOException e) { - Assert.fail(); + if (assertDone.compareAndSet(false, true)) { + // verify the starting sequence number while recovering a failed shard which has a valid last commit + long startingSeqNo = -1; + try { + startingSeqNo = Long.parseLong( + idxShard.store().readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO) + ); + } catch (IOException e) { + Assert.fail(); + } + assertEquals(numDocs - 1, startingSeqNo); } - assertEquals(numDocs - 1, startingSeqNo); return idxShard; } }); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index ff7dfbef9172d..67ac735463293 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -691,9 +691,15 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE ) { @Override - void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { + void phase1( + IndexCommit snapshot, + long startingSeqNo, + IntSupplier translogOps, + ActionListener listener, + boolean skipCreateRetentionLeaseStep + ) { phase1Called.set(true); - super.phase1(snapshot, startingSeqNo, translogOps, listener); + super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep); } @Override @@ -993,7 +999,7 @@ void createRetentionLease(long startingSeqNo, ActionListener lis final StepListener phase1Listener = new StepListener<>(); try { final CountDownLatch latch = new CountDownLatch(1); - handler.phase1(DirectoryReader.listCommits(dir).get(0), 0, () -> 0, new LatchedActionListener<>(phase1Listener, latch)); + handler.phase1(DirectoryReader.listCommits(dir).get(0), 0, () -> 0, new LatchedActionListener<>(phase1Listener, latch), false); latch.await(); phase1Listener.result(); } catch (Exception e) { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 953b6dfbb2f7b..ea98e0cd66fe9 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -67,6 +67,7 @@ import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.internal.io.IOUtils; @@ -99,12 +100,12 @@ import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.recovery.AsyncRecoveryTarget; -import org.opensearch.indices.recovery.DefaultRecoverySourceHandler; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryResponse; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoverySourceHandler; +import org.opensearch.indices.recovery.RecoverySourceHandlerFactory; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; @@ -133,8 +134,8 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -862,17 +863,23 @@ protected final void recoverUnstartedReplica( recoveryTarget, startingSeqNo ); - int fileChunkSizeInBytes = Math.toIntExact( - randomBoolean() ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() : randomIntBetween(1, 10 * 1024 * 1024) + long fileChunkSizeInBytes = randomBoolean() + ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() + : randomIntBetween(1, 10 * 1024 * 1024); + final Settings settings = Settings.builder() + .put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4))) + .put("indices.recovery.max_concurrent_operations", Integer.toString(between(1, 4))) + .build(); + RecoverySettings recoverySettings = new RecoverySettings( + settings, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - final RecoverySourceHandler recovery = new DefaultRecoverySourceHandler( + recoverySettings.setChunkSize(new ByteSizeValue(fileChunkSizeInBytes)); + final RecoverySourceHandler recovery = new RecoverySourceHandlerFactory().create( primary, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), - threadPool, request, - fileChunkSizeInBytes, - between(1, 8), - between(1, 8) + recoverySettings ); primary.updateShardState( primary.routingEntry(),