diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java index 75a1d35b853a8..48aa03c07abe3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java @@ -73,6 +73,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -1010,6 +1011,70 @@ public void testRecoveryReEstablishKeepsTheGrantedSnapshotFileDownloadPermit() t }); } + public void testRecoveryRetryKeepsTheGrantedSnapshotFileDownloadPermit() throws Exception { + executeRecoveryWithSnapshotFileDownloadThrottled((indices, + sourceNode, + targetNode, + targetMockTransportService, + recoverySnapshotFileRequests, + awaitForRecoverSnapshotFileRequestReceived, + respondToRecoverSnapshotFile) -> { + MockTransportService sourceMockTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, sourceNode); + + CountDownLatch startRecoveryRetryReceived = new CountDownLatch(1); + AtomicBoolean delayRecoveryExceptionSent = new AtomicBoolean(); + sourceMockTransportService.addRequestHandlingBehavior(PeerRecoverySourceService.Actions.START_RECOVERY, + (handler, request, channel, task) -> { + if (delayRecoveryExceptionSent.compareAndSet(false, true)) { + channel.sendResponse(new DelayRecoveryException("delay")); + } else { + startRecoveryRetryReceived.countDown(); + handler.messageReceived(request, channel, task); + } + }); + + String indexRecoveredFromSnapshot1 = indices.get(0); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + startRecoveryRetryReceived.await(); + sourceMockTransportService.clearAllRules(); + awaitForRecoverSnapshotFileRequestReceived.run(); + + String indexRecoveredFromPeer = indices.get(1); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromPeer) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromPeer); + assertPeerRecoveryDidNotUseSnapshots(indexRecoveredFromPeer, sourceNode, targetNode); + + respondToRecoverSnapshotFile.run(); + + ensureGreen(indexRecoveredFromSnapshot1); + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot1, sourceNode, targetNode); + + targetMockTransportService.clearAllRules(); + + final String indexRecoveredFromSnapshot2 = indices.get(2); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot2) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromSnapshot2); + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot2, sourceNode, targetNode); + }); + } + + private void executeRecoveryWithSnapshotFileDownloadThrottled(SnapshotBasedRecoveryThrottlingTestCase testCase) throws Exception { updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), "1"); updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), "1"); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 1a8e29e48c2de..f3406cf22cf71 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -67,8 +67,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private final IndexShard indexShard; private final DiscoveryNode sourceNode; private final SnapshotFilesProvider snapshotFilesProvider; - @Nullable // if we're not downloading files from snapshots in this recovery - private final Releasable snapshotFileDownloadsPermit; private final MultiFileWriter multiFileWriter; private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker(); private final Store store; @@ -83,6 +81,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private volatile boolean recoveryMonitorEnabled = true; + @Nullable // if we're not downloading files from snapshots in this recovery or we're retrying + private volatile Releasable snapshotFileDownloadsPermit; + // latch that can be used to blockingly wait for RecoveryTarget to be closed private final CountDownLatch closedLatch = new CountDownLatch(1); @@ -125,7 +126,11 @@ public RecoveryTarget(IndexShard indexShard, * @return a copy of this recovery target */ public RecoveryTarget retryCopy() { - return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener); + // If we're retrying we should remove the reference from this instance as the underlying resources + // get released after the retry copy is created + Releasable snapshotFileDownloadsPermitCopy = snapshotFileDownloadsPermit; + snapshotFileDownloadsPermit = null; + return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermitCopy, listener); } @Nullable