From e60f7ed96fbe7b28ae6c3628f2eb12206313f742 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 2 Sep 2021 13:08:35 +0200 Subject: [PATCH] Fix Temporarily Leaking Shard Level Metadata Blobs in some Cases (#76562) When doing out of order finalizations we would leak shard level metadata blobs at times. This commit enhances the cleanup logic after finalization to catch these leaked blobs and adds a test that would without this fix trip the leaked blobs assertion in the test infrastructure. --- .../snapshots/ConcurrentSnapshotsIT.java | 37 +++++++++++++++++++ .../cluster/SnapshotsInProgress.java | 32 ++++++++++++++++ .../repositories/FinalizeSnapshotContext.java | 23 +++++++++++- .../blobstore/BlobStoreRepository.java | 19 ++++++++-- .../snapshots/SnapshotsService.java | 2 - 5 files changed, 107 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index 34eabc4e17298..0f53857357834 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -1796,6 +1796,43 @@ public void testQueuedAfterFailedShardSnapshot() throws Exception { assertEquals(snapshotsStatusResponse1, snapshotsStatusResponse3); } + public void testOutOfOrderFinalizationManySnapshots() throws Exception { + internalCluster().startMasterOnlyNode(); + final List dataNodes = internalCluster().startDataOnlyNodes(2); + final String index1 = "index-1"; + final String index2 = "index-2"; + createIndexWithContent(index1, dataNodes.get(0), dataNodes.get(1)); + createIndexWithContent(index2, dataNodes.get(1), dataNodes.get(0)); + + final String repository = "test-repo"; + createRepository(repository, "mock"); + + blockNodeWithIndex(repository, index2); + + final ActionFuture snapshot1 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-1") + .setIndices(index1, index2) + .setWaitForCompletion(true) + .execute(); + final ActionFuture snapshot2 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-2") + .setIndices(index1, index2) + .setWaitForCompletion(true) + .execute(); + awaitNumberOfSnapshotsInProgress(2); + final ActionFuture snapshot3 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-3") + .setIndices(index1) + .setWaitForCompletion(true) + .execute(); + assertSuccessful(snapshot3); + unblockAllDataNodes(repository); + assertSuccessful(snapshot1); + assertSuccessful(snapshot2); + + assertThat( + clusterAdmin().prepareSnapshotStatus().setSnapshots("snapshot-2").setRepository(repository).get().getSnapshots(), + hasSize(1) + ); + } + private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) { final SnapshotsStatusResponse snapshotsStatusResponse = client().admin() .cluster() diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 947ca3acde233..5459b13ed499b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -86,6 +86,38 @@ public Entry snapshot(final Snapshot snapshot) { return null; } + /** + * Computes a map of repository shard id to set of generations, containing all shard generations that became obsolete and may be + * deleted from the repository as the cluster state moved from the given {@code old} value of {@link SnapshotsInProgress} to this + * instance. + */ + public Map> obsoleteGenerations(SnapshotsInProgress old) { + final Map> obsoleteGenerations = new HashMap<>(); + for (Entry entry : old.entries()) { + final Entry updatedEntry = snapshot(entry.snapshot()); + if (updatedEntry == null) { + continue; + } + for (ObjectObjectCursor oldShardAssignment : entry.shardsByRepoShardId()) { + final RepositoryShardId repositoryShardId = oldShardAssignment.key; + final ShardSnapshotStatus oldStatus = oldShardAssignment.value; + final ShardSnapshotStatus newStatus = updatedEntry.shardsByRepoShardId().get(repositoryShardId); + if (oldStatus.state == ShardState.SUCCESS + && oldStatus.generation() != null + && newStatus != null + && newStatus.state() == ShardState.SUCCESS + && newStatus.generation() != null + && oldStatus.generation().equals(newStatus.generation()) == false + ) { + // We moved from a non-null generation successful generation to a different non-null successful generation + // so the original generation is clearly obsolete because it was in-flight before and is now unreferenced everywhere. + obsoleteGenerations.computeIfAbsent(repositoryShardId, ignored -> new HashSet<>()).add(oldStatus.generation()); + } + } + } + return Map.copyOf(obsoleteGenerations); + } + @Override public String getWriteableName() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java index 646a44b77a425..ad10b2b915794 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java @@ -8,14 +8,19 @@ package org.elasticsearch.repositories; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.core.Tuple; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotsService; +import java.util.Map; +import java.util.Set; + /** * Context for finalizing a snapshot. */ @@ -25,6 +30,12 @@ public final class FinalizeSnapshotContext extends ActionListener.Delegating< private final ShardGenerations updatedShardGenerations; + /** + * Obsolete shard generations map computed from the cluster state update that this finalization executed in + * {@link #updatedClusterState}. + */ + private final SetOnce>> obsoleteGenerations = new SetOnce<>(); + private final long repositoryStateId; private final Metadata clusterMetadata; @@ -78,8 +89,18 @@ public Metadata clusterMetadata() { return clusterMetadata; } + public Map> obsoleteShardGenerations() { + assert obsoleteGenerations.get() != null : "must only be called after #updatedClusterState"; + return obsoleteGenerations.get(); + } + public ClusterState updatedClusterState(ClusterState state) { - return SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot()); + final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot()); + obsoleteGenerations.set( + updatedState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY) + .obsoleteGenerations(state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)) + ); + return updatedState; } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 301f3651f03f9..e5b22d82bec7c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1402,7 +1402,7 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte finalizeSnapshotContext::updatedClusterState, ActionListener.wrap(newRepoData -> { if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, newRepoData); + cleanupOldShardGens(existingRepositoryData, newRepoData, finalizeSnapshotContext); } finalizeSnapshotContext.onResponse(Tuple.tuple(newRepoData, snapshotInfo)); }, onUpdateFailure) @@ -1457,8 +1457,12 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte } // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data - private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) { - final List toDelete = new ArrayList<>(); + private void cleanupOldShardGens( + RepositoryData existingRepositoryData, + RepositoryData updatedRepositoryData, + FinalizeSnapshotContext finalizeSnapshotContext + ) { + final Set toDelete = new HashSet<>(); final int prefixPathLen = basePath().buildAsString().length(); updatedRepositoryData.shardGenerations() .obsoleteShardGenerations(existingRepositoryData.shardGenerations()) @@ -1469,6 +1473,15 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito ) ) ); + for (Map.Entry> obsoleteEntry : finalizeSnapshotContext.obsoleteShardGenerations() + .entrySet()) { + final String containerPath = shardContainer(obsoleteEntry.getKey().index(), obsoleteEntry.getKey().shardId()).path() + .buildAsString() + .substring(prefixPathLen) + INDEX_FILE_PREFIX; + for (ShardGeneration shardGeneration : obsoleteEntry.getValue()) { + toDelete.add(containerPath + shardGeneration); + } + } try { deleteFromContainer(blobContainer(), toDelete.iterator()); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index a8f389300e62b..3ad43c9f5c1d8 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1816,8 +1816,6 @@ private static Tuple> read * @return updated cluster state */ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) { - // TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance - // BlobStoreTestUtil to catch this leak SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); ClusterState result = state; int indexOfEntry = -1;