From cf66c462179b0a1400ecb6b6637ecc6de456ab5a Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 30 Nov 2021 17:02:07 +0100 Subject: [PATCH] retries + expiration --- .../common/settings/ClusterSettings.java | 2 + .../snapshots/SnapshotsService.java | 171 ++++++++++++------ ...leSnapshotsPendingDeletionsIntegTests.java | 80 ++++++++ 3 files changed, 194 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index f2baa71c171a6..291ff5a03960d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -486,6 +486,8 @@ public void apply(Settings value, Settings current, Settings previous) { HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING, HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING, SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, + SnapshotsService.PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING, + SnapshotsService.PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING, RestoreService.REFRESH_REPO_UUID_ON_RESTORE_SETTING, FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 3423d0e2d581e..1860157d44173 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -226,6 +226,15 @@ public SnapshotsService( maxConcurrentOperations = MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i); + pendingDeletionsRetryInterval = PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING, t -> pendingDeletionsRetryInterval = t); + pendingDeletionsExpirationInterval = PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING, + t -> pendingDeletionsExpirationInterval = t + ); } this.systemIndexDescriptorMap = systemIndexDescriptorMap; } @@ -1297,6 +1306,11 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS */ private final Set ongoingSnapshotsDeletions = ConcurrentCollections.newConcurrentSet(); + // only used in tests + public boolean hasOngoingSnapshotsDeletions(SnapshotId snapshotId) { + return ongoingSnapshotsDeletions.contains(snapshotId); + } + /** * Set of pending snapshots deletions whose deletion is conflicting with on-going restores/clones/repository clean up or repository * missing or read-only. Those sets are used to identify the cluster state updates to process in case they resolve some conflict. @@ -1309,7 +1323,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS /** * Find snapshots to delete in the the cluster state and triggers explicit snapshot delete requests. This method attempts to detect * conflicting situations where triggering the snapshot deletion would likely fail due to a concurrent snapshot operation. In such - * cases the snapshot deletion is not triggered as it should be triggered by subsequent cluster state updates on the conflicting + * cases the snapshot deletion is not triggered as it should be triggered by subsequent cluster state updates once the conflicting * situation is resolved. * * The repository name and uuid information are extracted from the {@link SnapshotDeletionsPending} entries in order to find the @@ -1343,8 +1357,8 @@ private void triggerSnapshotsPendingDeletions(final ClusterChangedEvent event) { final Set currentRestores = restoreSources(state); final Set currentClones = cloneSources(state); - // the list of snapshot ids to trigger deletion for, per repository - final Map> snapshotsToDelete = new HashMap<>(); + // the snapshots to trigger deletion for, per repository + final Map> snapshotsToDelete = new HashMap<>(); for (SnapshotDeletionsPending.Entry snapshot : snapshotDeletionsPending.entries()) { final SnapshotId snapshotId = snapshot.getSnapshotId(); @@ -1413,11 +1427,11 @@ private void triggerSnapshotsPendingDeletions(final ClusterChangedEvent event) { } pendingDeletionsWithConflictingRepos.remove(snapshotId); - // should we add some throttling to not always retry? if (ongoingSnapshotsDeletions.add(snapshotId)) { logger.info("triggering snapshot deletion for [{}]", snapshotId); - final boolean added = snapshotsToDelete.computeIfAbsent(repository, r -> new HashSet<>()).add(snapshotId); - assert added : snapshotId; + final Long previous = snapshotsToDelete.computeIfAbsent(repository, r -> new HashMap<>()) + .put(snapshotId, snapshot.getIndexDeletionTime()); + assert previous == null : snapshotId; } } snapshotsToDelete.forEach( @@ -1481,116 +1495,155 @@ private boolean pendingDeletionsWithConflictsChanged(ClusterChangedEvent event) return false; } + public static final Setting PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING = Setting.timeSetting( + "snapshot.snapshot_deletions_pending.retry_interval", + TimeValue.timeValueSeconds(30L), + TimeValue.ZERO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final Setting PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING = Setting.timeSetting( + "snapshot.snapshot_deletions_pending.expiration_interval", + TimeValue.timeValueHours(12L), + TimeValue.ZERO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private volatile TimeValue pendingDeletionsRetryInterval; + private volatile TimeValue pendingDeletionsExpirationInterval; + + private boolean isExpiredPendingDeletion(long deletionTimeEpochMillis) { + return Instant.ofEpochMilli(deletionTimeEpochMillis) + .plusMillis(pendingDeletionsExpirationInterval.getMillis()) + .isBefore(Instant.now()); + } + /** * A {@link Runnable} used to process the deletion of snapshots marked as to delete for a given repository. */ private class SnapshotsToDeleteRunnable extends AbstractRunnable { - private final Set snapshotIdsToDelete; + private final Map snapshots; private final String repositoryName; private final String repositoryUuid; + private final boolean missingUuid; - SnapshotsToDeleteRunnable(String repositoryName, String repositoryUuid, Set snapshotIdsToDelete) { + SnapshotsToDeleteRunnable(String repositoryName, String repositoryUuid, Map snapshots) { this.repositoryName = Objects.requireNonNull(repositoryName); this.repositoryUuid = Objects.requireNonNull(repositoryUuid); - this.snapshotIdsToDelete = Objects.requireNonNull(snapshotIdsToDelete); - assert snapshotIdsToDelete.isEmpty() == false; + this.snapshots = Objects.requireNonNull(snapshots); + this.missingUuid = RepositoryData.MISSING_UUID.equals(repositoryUuid); } @Override protected void doRun() throws Exception { - final List missingSnapshots = new CopyOnWriteArrayList<>(); - final CountDown countDown = new CountDown(snapshotIdsToDelete.size()); + final Set pendingDeletionsToRemove = ConcurrentCollections.newConcurrentSet(); + final CountDown countDown = new CountDown(snapshots.size()); - for (SnapshotId snapshotId : snapshotIdsToDelete) { + for (Map.Entry snapshot : snapshots.entrySet()) { + final SnapshotId snapshotId = snapshot.getKey(); final ActionListener listener = new ActionListener() { @Override public void onResponse(Void unused) { - logger.debug("[{}] snapshot marked as to delete [{}] is now deleted", repositoryName, snapshotId); - removeSnapshot(snapshotId, true); + logger.debug( + "snapshot marked as to delete [{}] successfully deleted from repository [{}/{}]", + snapshotId, + repositoryName, + repositoryUuid + ); + pendingDeletionsToRemove.add(snapshotId); + finish(); } @Override public void onFailure(Exception e) { - boolean shouldRetry = true; - if (e instanceof SnapshotMissingException) { + if (e instanceof SnapshotMissingException && missingUuid == false) { + pendingDeletionsToRemove.add(snapshotId); logger.debug( () -> new ParameterizedMessage( - "[{}] snapshot to delete [{}] is already deleted or is missing", + "snapshot marked as to delete [{}] is missing in repository [{}/{}], removing from pending deletions", + snapshotId, repositoryName, - snapshotId + repositoryUuid ), e ); - // only retry missing snapshots if the repository uuid is unknown, otherwise the snapshot pending deletion entry - // is removed from the cluster state as we know it does not exist anymore in the repository - shouldRetry = RepositoryData.MISSING_UUID.equals(repositoryUuid) == false; - - } else if (e instanceof ConcurrentSnapshotExecutionException) { - assert false : e; - logger.debug( - "[{}] failed to delete snapshot [{}]: a concurrent operation is running", - repositoryName, - snapshotId - ); - } else if (e instanceof RepositoryMissingException) { + } else if (isExpiredPendingDeletion(snapshot.getValue())) { + pendingDeletionsToRemove.add(snapshotId); logger.warn( () -> new ParameterizedMessage( - "[{}] failed to delete snapshot [{}]: repository has been removed before snapshot marked as " - + "to delete could be deleted, the snapshot might be leaking", + "snapshot marked as to delete [{}] failed to be deleted within [{}]. The pending snapshot " + + "expired before the snapshot could be deleted from the repository and as such might still " + + "exist in the original repository [{}/{}]. This snapshot will now be removed from the list of " + + "pending deletions.", + snapshotId, + pendingDeletionsExpirationInterval, repositoryName, - snapshotId + repositoryUuid ), e ); } else { - logger.warn( - () -> new ParameterizedMessage("[{}] failed to delete snapshot [{}]", repositoryName, snapshotId), + logger.debug( + () -> new ParameterizedMessage( + "[{}/{}] attempt to delete snapshot marked as to delete [{}] failed; deletion will be retried in [{}]", + repositoryName, + repositoryUuid, + snapshotId, + pendingDeletionsRetryInterval + ), e ); } - removeSnapshot(snapshotId, shouldRetry); + finish(); } - void removeSnapshot(SnapshotId snapshotId, boolean shouldRetry) { - if (shouldRetry) { - final boolean removed = ongoingSnapshotsDeletions.remove(snapshotId); - assert removed : "snapshot to delete [" + snapshotId + "] not found"; - } else { - missingSnapshots.add(snapshotId); - } - if (countDown.countDown() && missingSnapshots.isEmpty() == false) { - clusterService.submitStateUpdateTask( - "remove-missing-snapshot-deletions-in-pending", - new ClusterStateUpdateTask() { + void finish() { + if (countDown.countDown()) { + final Map retryables = snapshots.entrySet() + .stream() + .filter(snap -> pendingDeletionsToRemove.contains(snap.getKey()) == false) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + if (retryables.isEmpty() == false) { + // TODO maybe re-resolve repository here if the uuid is missing? + threadPool.scheduleUnlessShuttingDown( + pendingDeletionsRetryInterval, + ThreadPool.Names.GENERIC, + new SnapshotsToDeleteRunnable(repositoryName, repositoryUuid, retryables) + ); + } + if (pendingDeletionsToRemove.isEmpty() == false) { + clusterService.submitStateUpdateTask("remove-snapshot-deletions-in-pending", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - final SnapshotDeletionsPending deletionsInPending = currentState.custom( + final SnapshotDeletionsPending currentPendings = currentState.custom( SnapshotDeletionsPending.TYPE, SnapshotDeletionsPending.EMPTY ); - final SnapshotDeletionsPending updatedDeletionsInPending = deletionsInPending.withRemovedSnapshots( - missingSnapshots + final SnapshotDeletionsPending updatedPendings = currentPendings.withRemovedSnapshots( + List.copyOf(pendingDeletionsToRemove) ); - if (deletionsInPending == updatedDeletionsInPending) { + if (currentPendings == updatedPendings) { return currentState; } return ClusterState.builder(currentState) - .putCustom(SnapshotDeletionsPending.TYPE, updatedDeletionsInPending) + .putCustom(SnapshotDeletionsPending.TYPE, updatedPendings) .build(); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - missingSnapshots.forEach(ongoingSnapshotsDeletions::remove); + ongoingSnapshotsDeletions.removeAll(pendingDeletionsToRemove); } @Override public void onFailure(String source, Exception e) { - missingSnapshots.forEach(ongoingSnapshotsDeletions::remove); + ongoingSnapshotsDeletions.removeAll(pendingDeletionsToRemove); } - } - ); + }); + } } } }; @@ -1614,9 +1667,9 @@ public void onFailure(String source, Exception e) { @Override public void onFailure(Exception e) { - ongoingSnapshotsDeletions.removeAll(snapshotIdsToDelete); + ongoingSnapshotsDeletions.removeAll(snapshots.keySet()); logger.warn( - () -> new ParameterizedMessage("[{}] failed to trigger deletion of snapshots {}", repositoryName, snapshotIdsToDelete), + () -> new ParameterizedMessage("[{}] failed to trigger deletion of snapshots {}", repositoryName, snapshots.keySet()), e ); } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsPendingDeletionsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsPendingDeletionsIntegTests.java index dffd35023865c..593f406b5f597 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsPendingDeletionsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsPendingDeletionsIntegTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException; @@ -35,6 +36,7 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; @@ -57,6 +59,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; public class SearchableSnapshotsPendingDeletionsIntegTests extends BaseFrozenSearchableSnapshotsIntegTestCase { @@ -418,6 +421,83 @@ public void testSearchableSnapshotIsDeletedWithOnRepoCleanUp() throws Exception }); } + public void testSearchableSnapshotIsDeletedAfterExpiration() throws Exception { + mountIndexThenExecute((repository, snapshot, index) -> { + try { + assertAcked( + clusterAdmin().prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put( + SnapshotsService.PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(randomLongBetween(100L, 1000L)) + ) + .build() + ) + ); + + assertAcked( + clusterAdmin().preparePutRepository(repository) + .setVerify(false) + .setType("mock") + .setSettings( + Settings.builder() + .put(getRepositorySettings(repository).build()) + .put("random_control_io_exception_rate", 1.0) + .build() + ) + ); + + assertAcked(client().admin().indices().prepareDelete(mountedIndex(index))); + awaitSnapshotPendingDeletion(snapshot); + + final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class); + assertBusy(() -> assertTrue(snapshotsService.hasOngoingSnapshotsDeletions(snapshot))); + + assertAcked( + clusterAdmin().prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put(SnapshotsService.PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING.getKey(), TimeValue.ZERO) + .build() + ) + ); + + assertBusy(() -> assertFalse(snapshotsService.hasOngoingSnapshotsDeletions(snapshot))); + awaitNoMoreSnapshotsDeletions(); + + } catch (Exception e) { + throw new AssertionError(e); + } finally { + assertAcked( + clusterAdmin().prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .putNull(SnapshotsService.PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING.getKey()) + .putNull(SnapshotsService.PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING.getKey()) + .build() + ) + ); + assertAcked( + clusterAdmin().preparePutRepository(repository) + .setVerify(false) + .setType("mock") + .setSettings( + Settings.builder() + .put(getRepositorySettings(repository).build()) + .put("random_control_io_exception_rate", 0.0) + .build() + ) + ); + } + + assertThat( + client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot.getName()).get().getSnapshots(), + hasSize(1) + ); + }); + } + private void mountIndexThenExecute(final TriConsumer test) throws Exception { final String suffix = randomAlphaOfLength(5).toLowerCase(Locale.ROOT); final String repository = "repository-" + suffix;