Skip to content

Commit

Permalink
retries + expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Nov 30, 2021
1 parent 54363a1 commit cf66c46
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,8 @@ public void apply(Settings value, Settings current, Settings previous) {
HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING,
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING,
SnapshotsService.PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING,
SnapshotsService.PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING,
RestoreService.REFRESH_REPO_UUID_ON_RESTORE_SETTING,
FsHealthService.ENABLED_SETTING,
FsHealthService.REFRESH_INTERVAL_SETTING,
Expand Down
171 changes: 112 additions & 59 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,15 @@ public SnapshotsService(
maxConcurrentOperations = MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i);
pendingDeletionsRetryInterval = PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING, t -> pendingDeletionsRetryInterval = t);
pendingDeletionsExpirationInterval = PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING,
t -> pendingDeletionsExpirationInterval = t
);
}
this.systemIndexDescriptorMap = systemIndexDescriptorMap;
}
Expand Down Expand Up @@ -1297,6 +1306,11 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
*/
private final Set<SnapshotId> ongoingSnapshotsDeletions = ConcurrentCollections.newConcurrentSet();

// only used in tests
public boolean hasOngoingSnapshotsDeletions(SnapshotId snapshotId) {
return ongoingSnapshotsDeletions.contains(snapshotId);
}

/**
* Set of pending snapshots deletions whose deletion is conflicting with on-going restores/clones/repository clean up or repository
* missing or read-only. Those sets are used to identify the cluster state updates to process in case they resolve some conflict.
Expand All @@ -1309,7 +1323,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
/**
* Find snapshots to delete in the the cluster state and triggers explicit snapshot delete requests. This method attempts to detect
* conflicting situations where triggering the snapshot deletion would likely fail due to a concurrent snapshot operation. In such
* cases the snapshot deletion is not triggered as it should be triggered by subsequent cluster state updates on the conflicting
* cases the snapshot deletion is not triggered as it should be triggered by subsequent cluster state updates once the conflicting
* situation is resolved.
*
* The repository name and uuid information are extracted from the {@link SnapshotDeletionsPending} entries in order to find the
Expand Down Expand Up @@ -1343,8 +1357,8 @@ private void triggerSnapshotsPendingDeletions(final ClusterChangedEvent event) {
final Set<SnapshotId> currentRestores = restoreSources(state);
final Set<SnapshotId> currentClones = cloneSources(state);

// the list of snapshot ids to trigger deletion for, per repository
final Map<RepositoryMetadata, Set<SnapshotId>> snapshotsToDelete = new HashMap<>();
// the snapshots to trigger deletion for, per repository
final Map<RepositoryMetadata, Map<SnapshotId, Long>> snapshotsToDelete = new HashMap<>();

for (SnapshotDeletionsPending.Entry snapshot : snapshotDeletionsPending.entries()) {
final SnapshotId snapshotId = snapshot.getSnapshotId();
Expand Down Expand Up @@ -1413,11 +1427,11 @@ private void triggerSnapshotsPendingDeletions(final ClusterChangedEvent event) {
}
pendingDeletionsWithConflictingRepos.remove(snapshotId);

// should we add some throttling to not always retry?
if (ongoingSnapshotsDeletions.add(snapshotId)) {
logger.info("triggering snapshot deletion for [{}]", snapshotId);
final boolean added = snapshotsToDelete.computeIfAbsent(repository, r -> new HashSet<>()).add(snapshotId);
assert added : snapshotId;
final Long previous = snapshotsToDelete.computeIfAbsent(repository, r -> new HashMap<>())
.put(snapshotId, snapshot.getIndexDeletionTime());
assert previous == null : snapshotId;
}
}
snapshotsToDelete.forEach(
Expand Down Expand Up @@ -1481,116 +1495,155 @@ private boolean pendingDeletionsWithConflictsChanged(ClusterChangedEvent event)
return false;
}

public static final Setting<TimeValue> PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING = Setting.timeSetting(
"snapshot.snapshot_deletions_pending.retry_interval",
TimeValue.timeValueSeconds(30L),
TimeValue.ZERO,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING = Setting.timeSetting(
"snapshot.snapshot_deletions_pending.expiration_interval",
TimeValue.timeValueHours(12L),
TimeValue.ZERO,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile TimeValue pendingDeletionsRetryInterval;
private volatile TimeValue pendingDeletionsExpirationInterval;

private boolean isExpiredPendingDeletion(long deletionTimeEpochMillis) {
return Instant.ofEpochMilli(deletionTimeEpochMillis)
.plusMillis(pendingDeletionsExpirationInterval.getMillis())
.isBefore(Instant.now());
}

/**
* A {@link Runnable} used to process the deletion of snapshots marked as to delete for a given repository.
*/
private class SnapshotsToDeleteRunnable extends AbstractRunnable {

private final Set<SnapshotId> snapshotIdsToDelete;
private final Map<SnapshotId, Long> snapshots;
private final String repositoryName;
private final String repositoryUuid;
private final boolean missingUuid;

SnapshotsToDeleteRunnable(String repositoryName, String repositoryUuid, Set<SnapshotId> snapshotIdsToDelete) {
SnapshotsToDeleteRunnable(String repositoryName, String repositoryUuid, Map<SnapshotId, Long> snapshots) {
this.repositoryName = Objects.requireNonNull(repositoryName);
this.repositoryUuid = Objects.requireNonNull(repositoryUuid);
this.snapshotIdsToDelete = Objects.requireNonNull(snapshotIdsToDelete);
assert snapshotIdsToDelete.isEmpty() == false;
this.snapshots = Objects.requireNonNull(snapshots);
this.missingUuid = RepositoryData.MISSING_UUID.equals(repositoryUuid);
}

@Override
protected void doRun() throws Exception {
final List<SnapshotId> missingSnapshots = new CopyOnWriteArrayList<>();
final CountDown countDown = new CountDown(snapshotIdsToDelete.size());
final Set<SnapshotId> pendingDeletionsToRemove = ConcurrentCollections.newConcurrentSet();
final CountDown countDown = new CountDown(snapshots.size());

for (SnapshotId snapshotId : snapshotIdsToDelete) {
for (Map.Entry<SnapshotId, Long> snapshot : snapshots.entrySet()) {
final SnapshotId snapshotId = snapshot.getKey();
final ActionListener<Void> listener = new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
logger.debug("[{}] snapshot marked as to delete [{}] is now deleted", repositoryName, snapshotId);
removeSnapshot(snapshotId, true);
logger.debug(
"snapshot marked as to delete [{}] successfully deleted from repository [{}/{}]",
snapshotId,
repositoryName,
repositoryUuid
);
pendingDeletionsToRemove.add(snapshotId);
finish();
}

@Override
public void onFailure(Exception e) {
boolean shouldRetry = true;
if (e instanceof SnapshotMissingException) {
if (e instanceof SnapshotMissingException && missingUuid == false) {
pendingDeletionsToRemove.add(snapshotId);
logger.debug(
() -> new ParameterizedMessage(
"[{}] snapshot to delete [{}] is already deleted or is missing",
"snapshot marked as to delete [{}] is missing in repository [{}/{}], removing from pending deletions",
snapshotId,
repositoryName,
snapshotId
repositoryUuid
),
e
);
// only retry missing snapshots if the repository uuid is unknown, otherwise the snapshot pending deletion entry
// is removed from the cluster state as we know it does not exist anymore in the repository
shouldRetry = RepositoryData.MISSING_UUID.equals(repositoryUuid) == false;

} else if (e instanceof ConcurrentSnapshotExecutionException) {
assert false : e;
logger.debug(
"[{}] failed to delete snapshot [{}]: a concurrent operation is running",
repositoryName,
snapshotId
);
} else if (e instanceof RepositoryMissingException) {
} else if (isExpiredPendingDeletion(snapshot.getValue())) {
pendingDeletionsToRemove.add(snapshotId);
logger.warn(
() -> new ParameterizedMessage(
"[{}] failed to delete snapshot [{}]: repository has been removed before snapshot marked as "
+ "to delete could be deleted, the snapshot might be leaking",
"snapshot marked as to delete [{}] failed to be deleted within [{}]. The pending snapshot "
+ "expired before the snapshot could be deleted from the repository and as such might still "
+ "exist in the original repository [{}/{}]. This snapshot will now be removed from the list of "
+ "pending deletions.",
snapshotId,
pendingDeletionsExpirationInterval,
repositoryName,
snapshotId
repositoryUuid
),
e
);
} else {
logger.warn(
() -> new ParameterizedMessage("[{}] failed to delete snapshot [{}]", repositoryName, snapshotId),
logger.debug(
() -> new ParameterizedMessage(
"[{}/{}] attempt to delete snapshot marked as to delete [{}] failed; deletion will be retried in [{}]",
repositoryName,
repositoryUuid,
snapshotId,
pendingDeletionsRetryInterval
),
e
);
}
removeSnapshot(snapshotId, shouldRetry);
finish();
}

void removeSnapshot(SnapshotId snapshotId, boolean shouldRetry) {
if (shouldRetry) {
final boolean removed = ongoingSnapshotsDeletions.remove(snapshotId);
assert removed : "snapshot to delete [" + snapshotId + "] not found";
} else {
missingSnapshots.add(snapshotId);
}
if (countDown.countDown() && missingSnapshots.isEmpty() == false) {
clusterService.submitStateUpdateTask(
"remove-missing-snapshot-deletions-in-pending",
new ClusterStateUpdateTask() {
void finish() {
if (countDown.countDown()) {
final Map<SnapshotId, Long> retryables = snapshots.entrySet()
.stream()
.filter(snap -> pendingDeletionsToRemove.contains(snap.getKey()) == false)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (retryables.isEmpty() == false) {
// TODO maybe re-resolve repository here if the uuid is missing?
threadPool.scheduleUnlessShuttingDown(
pendingDeletionsRetryInterval,
ThreadPool.Names.GENERIC,
new SnapshotsToDeleteRunnable(repositoryName, repositoryUuid, retryables)
);
}
if (pendingDeletionsToRemove.isEmpty() == false) {
clusterService.submitStateUpdateTask("remove-snapshot-deletions-in-pending", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final SnapshotDeletionsPending deletionsInPending = currentState.custom(
final SnapshotDeletionsPending currentPendings = currentState.custom(
SnapshotDeletionsPending.TYPE,
SnapshotDeletionsPending.EMPTY
);
final SnapshotDeletionsPending updatedDeletionsInPending = deletionsInPending.withRemovedSnapshots(
missingSnapshots
final SnapshotDeletionsPending updatedPendings = currentPendings.withRemovedSnapshots(
List.copyOf(pendingDeletionsToRemove)
);
if (deletionsInPending == updatedDeletionsInPending) {
if (currentPendings == updatedPendings) {
return currentState;
}
return ClusterState.builder(currentState)
.putCustom(SnapshotDeletionsPending.TYPE, updatedDeletionsInPending)
.putCustom(SnapshotDeletionsPending.TYPE, updatedPendings)
.build();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
missingSnapshots.forEach(ongoingSnapshotsDeletions::remove);
ongoingSnapshotsDeletions.removeAll(pendingDeletionsToRemove);
}

@Override
public void onFailure(String source, Exception e) {
missingSnapshots.forEach(ongoingSnapshotsDeletions::remove);
ongoingSnapshotsDeletions.removeAll(pendingDeletionsToRemove);
}
}
);
});
}
}
}
};
Expand All @@ -1614,9 +1667,9 @@ public void onFailure(String source, Exception e) {

@Override
public void onFailure(Exception e) {
ongoingSnapshotsDeletions.removeAll(snapshotIdsToDelete);
ongoingSnapshotsDeletions.removeAll(snapshots.keySet());
logger.warn(
() -> new ParameterizedMessage("[{}] failed to trigger deletion of snapshots {}", repositoryName, snapshotIdsToDelete),
() -> new ParameterizedMessage("[{}] failed to trigger deletion of snapshots {}", repositoryName, snapshots.keySet()),
e
);
}
Expand Down
Loading

0 comments on commit cf66c46

Please sign in to comment.