Skip to content

Commit

Permalink
Rename args in cleanup process to match deletion process (#100620)
Browse files Browse the repository at this point in the history
Relates #100568
  • Loading branch information
DaveCTurner authored Oct 11, 2023
1 parent b5843e4 commit 3ea9779
Showing 1 changed file with 61 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -785,16 +785,16 @@ public RepositoryStats stats() {
/**
* Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation.
*
* @param repositoryStateId Expected repository generation
* @param rootBlobs Blobs at the repository root
* @param repositoryDataGeneration Expected repository generation
* @param rootBlobs Blobs at the repository root
* @return RepositoryData
*/
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetadata> rootBlobs) {
private RepositoryData safeRepositoryData(long repositoryDataGeneration, Map<String, BlobMetadata> rootBlobs) {
final long generation = latestGeneration(rootBlobs.keySet());
final long genToLoad;
final RepositoryData cached;
if (bestEffortConsistency) {
genToLoad = latestKnownRepoGen.accumulateAndGet(repositoryStateId, Math::max);
genToLoad = latestKnownRepoGen.accumulateAndGet(repositoryDataGeneration, Math::max);
cached = null;
} else {
genToLoad = latestKnownRepoGen.get();
Expand All @@ -813,11 +813,11 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
+ "]"
);
}
if (genToLoad != repositoryStateId) {
if (genToLoad != repositoryDataGeneration) {
throw new RepositoryException(
metadata.name(),
"concurrent modification of the index-N file, expected current generation ["
+ repositoryStateId
+ repositoryDataGeneration
+ "], actual current generation ["
+ genToLoad
+ "]"
Expand Down Expand Up @@ -1299,35 +1299,34 @@ private Iterator<String> resolveFilesToDelete(Collection<ShardSnapshotMetaDelete

/**
* Cleans up stale blobs directly under the repository root as well as all indices paths that aren't referenced by any existing
* snapshots. This method is only to be called directly after a new {@link RepositoryData} was written to the repository and with
* parameters {@code foundIndices}, {@code rootBlobs}
* snapshots. This method is only to be called directly after a new {@link RepositoryData} was written to the repository.
*
* @param deletedSnapshots if this method is called as part of a delete operation, the snapshot ids just deleted or empty if called as
* part of a repository cleanup
* @param foundIndices all indices blob containers found in the repository before {@code newRepoData} was written
* @param rootBlobs all blobs found directly under the repository root
* @param newRepoData new repository data that was just written
* @param listener listener to invoke with the combined {@link DeleteResult} of all blobs removed in this operation
* @param snapshotIds if this method is called as part of a delete operation, the snapshot ids just deleted, or empty if
* called as part of a repository cleanup
* @param originalIndexContainers all indices blob containers found in the repository before {@code newRepositoryData} was written
* @param originalRootBlobs all blobs found directly under the repository root
* @param newRepositoryData new repository data that was just written
* @param listener listener to invoke with the combined {@link DeleteResult} of all blobs removed in this operation
*/
private void cleanupStaleBlobs(
Collection<SnapshotId> deletedSnapshots,
Map<String, BlobContainer> foundIndices,
Map<String, BlobMetadata> rootBlobs,
RepositoryData newRepoData,
Collection<SnapshotId> snapshotIds,
Map<String, BlobContainer> originalIndexContainers,
Map<String, BlobMetadata> originalRootBlobs,
RepositoryData newRepositoryData,
ActionListener<DeleteResult> listener
) {
final var blobsDeleted = new AtomicLong();
final var bytesDeleted = new AtomicLong();
try (var listeners = new RefCountingListener(listener.map(ignored -> DeleteResult.of(blobsDeleted.get(), bytesDeleted.get())))) {

final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
final List<String> staleRootBlobs = staleRootBlobs(newRepositoryData, originalRootBlobs.keySet());
if (staleRootBlobs.isEmpty() == false) {
staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> {
try (ref) {
logStaleRootLevelBlobs(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
logStaleRootLevelBlobs(newRepositoryData.getGenId() - 1, snapshotIds, staleRootBlobs);
deleteFromContainer(blobContainer(), staleRootBlobs.iterator());
for (final var staleRootBlob : staleRootBlobs) {
bytesDeleted.addAndGet(rootBlobs.get(staleRootBlob).length());
bytesDeleted.addAndGet(originalRootBlobs.get(staleRootBlob).length());
}
blobsDeleted.addAndGet(staleRootBlobs.size());
} catch (Exception e) {
Expand All @@ -1343,23 +1342,23 @@ private void cleanupStaleBlobs(
}));
}

final var survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
for (final var indexEntry : foundIndices.entrySet()) {
final var indexSnId = indexEntry.getKey();
if (survivingIndexIds.contains(indexSnId)) {
final var survivingIndexIds = newRepositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
for (final var indexEntry : originalIndexContainers.entrySet()) {
final var indexId = indexEntry.getKey();
if (survivingIndexIds.contains(indexId)) {
continue;
}
staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> {
try (ref) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexId);
final var deleteResult = indexEntry.getValue().delete(OperationPurpose.SNAPSHOT);
blobsDeleted.addAndGet(deleteResult.blobsDeleted());
bytesDeleted.addAndGet(deleteResult.bytesDeleted());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexId);
} catch (IOException e) {
logger.warn(() -> format("""
[%s] index %s is no longer part of any snapshot in the repository, \
but failed to clean up its index folder""", metadata.name(), indexSnId), e);
but failed to clean up its index folder""", metadata.name(), indexId), e);
}
}));
}
Expand All @@ -1386,40 +1385,45 @@ private void cleanupStaleBlobs(
* <li>Deleting stale indices</li>
* <li>Deleting unreferenced root level blobs</li>
* </ul>
* @param repositoryStateId Current repository state id
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param originalRepositoryDataGeneration Current repository state id
* @param repositoryFormatIndexVersion version of the updated repository metadata to write
* @param listener Listener to complete when done
*/
public void cleanup(long repositoryStateId, IndexVersion repositoryMetaVersion, ActionListener<RepositoryCleanupResult> listener) {
public void cleanup(
long originalRepositoryDataGeneration,
IndexVersion repositoryFormatIndexVersion,
ActionListener<RepositoryCleanupResult> listener
) {
try {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
}
Map<String, BlobMetadata> rootBlobs = blobContainer().listBlobs(OperationPurpose.SNAPSHOT);
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children(OperationPurpose.SNAPSHOT);
final Set<String> survivingIndexIds = repositoryData.getIndices()
Map<String, BlobMetadata> originalRootBlobs = blobContainer().listBlobs(OperationPurpose.SNAPSHOT);
final RepositoryData originalRepositoryData = safeRepositoryData(originalRepositoryDataGeneration, originalRootBlobs);
final Map<String, BlobContainer> originalIndexContainers = blobStore().blobContainer(indicesPath())
.children(OperationPurpose.SNAPSHOT);
final Set<String> survivingIndexIds = originalRepositoryData.getIndices()
.values()
.stream()
.map(IndexId::getId)
.collect(Collectors.toSet());
final List<String> staleRootBlobs = staleRootBlobs(repositoryData, rootBlobs.keySet());
if (survivingIndexIds.equals(foundIndices.keySet()) && staleRootBlobs.isEmpty()) {
final List<String> staleRootBlobs = staleRootBlobs(originalRepositoryData, originalRootBlobs.keySet());
if (survivingIndexIds.equals(originalIndexContainers.keySet()) && staleRootBlobs.isEmpty()) {
// Nothing to clean up we return
listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO));
} else {
// write new index-N blob to ensure concurrent operations will fail
writeIndexGen(
repositoryData,
repositoryStateId,
repositoryMetaVersion,
originalRepositoryData,
originalRepositoryDataGeneration,
repositoryFormatIndexVersion,
Function.identity(),
listener.delegateFailureAndWrap(
(l, v) -> cleanupStaleBlobs(
Collections.emptyList(),
foundIndices,
rootBlobs,
repositoryData,
originalIndexContainers,
originalRootBlobs,
originalRepositoryData,
l.map(RepositoryCleanupResult::new)
)
)
Expand All @@ -1431,9 +1435,12 @@ public void cleanup(long repositoryStateId, IndexVersion repositoryMetaVersion,
}

// Finds all blobs directly under the repository root path that are not referenced by the current RepositoryData
private static List<String> staleRootBlobs(RepositoryData repositoryData, Set<String> rootBlobNames) {
final Set<String> allSnapshotIds = repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
return rootBlobNames.stream().filter(blob -> {
private static List<String> staleRootBlobs(RepositoryData originalRepositoryData, Set<String> originalRootBlobNames) {
final Set<String> allSnapshotIds = originalRepositoryData.getSnapshotIds()
.stream()
.map(SnapshotId::getUUID)
.collect(Collectors.toSet());
return originalRootBlobNames.stream().filter(blob -> {
if (FsBlobContainer.isTempBlobName(blob)) {
return true;
}
Expand All @@ -1452,7 +1459,7 @@ private static List<String> staleRootBlobs(RepositoryData repositoryData, Set<St
} else if (blob.startsWith(INDEX_FILE_PREFIX)) {
// TODO: Include the current generation here once we remove keeping index-(N-1) around from #writeIndexGen
try {
return repositoryData.getGenId() > Long.parseLong(blob.substring(INDEX_FILE_PREFIX.length()));
return originalRepositoryData.getGenId() > Long.parseLong(blob.substring(INDEX_FILE_PREFIX.length()));
} catch (NumberFormatException nfe) {
// odd case of an extra file with the index- prefix that we can't identify
return false;
Expand All @@ -1462,17 +1469,21 @@ private static List<String> staleRootBlobs(RepositoryData repositoryData, Set<St
}).toList();
}

private void logStaleRootLevelBlobs(long previousGeneration, Collection<SnapshotId> deletedSnapshots, List<String> blobsToDelete) {
private void logStaleRootLevelBlobs(
long originalRepositoryDataGeneration,
Collection<SnapshotId> snapshotIds,
List<String> blobsToDelete
) {
if (logger.isInfoEnabled()) {
// If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata
// blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot
// delete would also log a confusing INFO message about "stale blobs".
final Set<String> blobNamesToIgnore = deletedSnapshots.stream()
final Set<String> blobNamesToIgnore = snapshotIds.stream()
.flatMap(
snapshotId -> Stream.of(
GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()),
SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()),
INDEX_FILE_PREFIX + previousGeneration
INDEX_FILE_PREFIX + originalRepositoryDataGeneration
)
)
.collect(Collectors.toSet());
Expand Down

0 comments on commit 3ea9779

Please sign in to comment.