Skip to content

Commit

Permalink
Do not release snapshot file download permit during recovery retries (#…
Browse files Browse the repository at this point in the history
…79438)

Relates #79316
Backport of #79409
  • Loading branch information
fcofdez authored Oct 19, 2021
1 parent c6e5483 commit 703d6c2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -1009,6 +1010,70 @@ public void testRecoveryReEstablishKeepsTheGrantedSnapshotFileDownloadPermit() t
});
}

public void testRecoveryRetryKeepsTheGrantedSnapshotFileDownloadPermit() throws Exception {
executeRecoveryWithSnapshotFileDownloadThrottled((indices,
sourceNode,
targetNode,
targetMockTransportService,
recoverySnapshotFileRequests,
awaitForRecoverSnapshotFileRequestReceived,
respondToRecoverSnapshotFile) -> {
MockTransportService sourceMockTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, sourceNode);

CountDownLatch startRecoveryRetryReceived = new CountDownLatch(1);
AtomicBoolean delayRecoveryExceptionSent = new AtomicBoolean();
sourceMockTransportService.addRequestHandlingBehavior(PeerRecoverySourceService.Actions.START_RECOVERY,
(handler, request, channel, task) -> {
if (delayRecoveryExceptionSent.compareAndSet(false, true)) {
channel.sendResponse(new DelayRecoveryException("delay"));
} else {
startRecoveryRetryReceived.countDown();
handler.messageReceived(request, channel, task);
}
});

String indexRecoveredFromSnapshot1 = indices.get(0);
assertAcked(
client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1)
.setSettings(Settings.builder()
.put("index.routing.allocation.require._name", targetNode)).get()
);

startRecoveryRetryReceived.await();
sourceMockTransportService.clearAllRules();
awaitForRecoverSnapshotFileRequestReceived.run();

String indexRecoveredFromPeer = indices.get(1);
assertAcked(
client().admin().indices().prepareUpdateSettings(indexRecoveredFromPeer)
.setSettings(Settings.builder()
.put("index.routing.allocation.require._name", targetNode)).get()
);

ensureGreen(indexRecoveredFromPeer);
assertPeerRecoveryDidNotUseSnapshots(indexRecoveredFromPeer, sourceNode, targetNode);

respondToRecoverSnapshotFile.run();

ensureGreen(indexRecoveredFromSnapshot1);
assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot1, sourceNode, targetNode);

targetMockTransportService.clearAllRules();

final String indexRecoveredFromSnapshot2 = indices.get(2);
assertAcked(
client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot2)
.setSettings(Settings.builder()
.put("index.routing.allocation.require._name", targetNode)).get()
);

ensureGreen(indexRecoveredFromSnapshot2);
assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot2, sourceNode, targetNode);
});
}


private void executeRecoveryWithSnapshotFileDownloadThrottled(SnapshotBasedRecoveryThrottlingTestCase testCase) throws Exception {
updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), "1");
updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), "1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final SnapshotFilesProvider snapshotFilesProvider;
@Nullable // if we're not downloading files from snapshots in this recovery
private final Releasable snapshotFileDownloadsPermit;
private final MultiFileWriter multiFileWriter;
private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();
private final Store store;
Expand All @@ -84,6 +82,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget

private volatile boolean recoveryMonitorEnabled = true;

@Nullable // if we're not downloading files from snapshots in this recovery or we're retrying
private volatile Releasable snapshotFileDownloadsPermit;

// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);

Expand Down Expand Up @@ -126,7 +127,11 @@ public RecoveryTarget(IndexShard indexShard,
* @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener);
// If we're retrying we should remove the reference from this instance as the underlying resources
// get released after the retry copy is created
Releasable snapshotFileDownloadsPermitCopy = snapshotFileDownloadsPermit;
snapshotFileDownloadsPermit = null;
return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermitCopy, listener);
}

@Nullable
Expand Down

0 comments on commit 703d6c2

Please sign in to comment.