Skip to content

Commit

Permalink
Optimize Snapshot Finalization (#42723) (#43908)
Browse files Browse the repository at this point in the history
* Optimize Snapshot Finalization

* Delete index-N blobs and segement blobs in one single bulk delete instead of in separate ones to save RPC calls on implementations that have bulk deletes implemented
* Don't fail snapshot because deleting old index-N failed, this results in needlessly logging finalization failures and makes analysis of failures harder going forward as well as incorrect index.latest blobs
  • Loading branch information
original-brownbear authored Jul 3, 2019
1 parent 662f517 commit 7059224
Showing 1 changed file with 22 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -728,12 +728,6 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob, snapshotsBytes, true);
// delete the N-2 index file if it exists, keep the previous one around as a backup
if (isReadOnly() == false && newGen - 2 >= 0) {
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
}

// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
Expand All @@ -742,6 +736,15 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep
}
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
// delete the N-2 index file if it exists, keep the previous one around as a backup
if (newGen - 2 >= 0) {
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
try {
blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
} catch (IOException e) {
logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile);
}
}
}

/**
Expand Down Expand Up @@ -995,45 +998,24 @@ protected void finalize(final List<SnapshotFiles> snapshots,
final Map<String, BlobMetaData> blobs,
final String reason) {
final String indexGeneration = Integer.toString(fileListGeneration);
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
try {
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
final List<String> blobNames =
blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs during finalization",
snapshotId, shardId), e);
throw e;
}

// If we deleted all snapshots, we don't need to create a new index file
if (snapshots.size() > 0) {
final List<String> blobsToDelete;
if (snapshots.isEmpty()) {
// If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found
blobsToDelete = new ArrayList<>(blobs.keySet());
} else {
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, blobContainer, indexGeneration);
// Delete all previous index-N, data-blobs that are not referenced by the new index-N and temporary blobs
blobsToDelete = blobs.keySet().stream().filter(blob ->
blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|| blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
}

// Delete old index files
final List<String> indexBlobs =
blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs during finalization",
snapshotId, shardId), e);
throw e;
}

// Delete all blobs that don't exist in a snapshot
final List<String> orphanedBlobs = blobs.keySet().stream()
.filter(blobName ->
blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null)
.collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
blobContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs during finalization",
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization",
snapshotId, shardId), e);
}
} catch (IOException e) {
Expand Down

0 comments on commit 7059224

Please sign in to comment.