Skip to content

Commit

Permalink
Fix deletion permits flow in RemoteFsTimestampAwareTranslog (#16282) (#…
Browse files Browse the repository at this point in the history
…16336)

---------


(cherry picked from commit 23d1c7a)

Signed-off-by: Sachin Kale <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 0053014 commit df52485
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,21 +215,42 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
if (generationsToBeDeleted.isEmpty() == false) {
// Delete stale generations
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
remoteGenerationDeletionPermits::release
);
try {
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
remoteGenerationDeletionPermits::release
);
} catch (Exception e) {
logger.error("Exception in delete generations flow", e);
// Release permit that is meant for metadata files and return
remoteGenerationDeletionPermits.release();
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
+ remoteGenerationDeletionPermits.availablePermits()
+ " is not equal to "
+ REMOTE_DELETION_PERMITS;
return;
}
} else {
remoteGenerationDeletionPermits.release();
}

if (metadataFilesToBeDeleted.isEmpty() == false) {
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
remoteGenerationDeletionPermits::release
);
try {
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
remoteGenerationDeletionPermits::release
);
} catch (Exception e) {
logger.error("Exception in delete metadata files flow", e);
// Permits is already released by deleteMetadataFilesAsync
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
+ remoteGenerationDeletionPermits.availablePermits()
+ " is not equal to "
+ REMOTE_DELETION_PERMITS;
return;
}

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);
Expand All @@ -240,7 +261,12 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
remoteGenerationDeletionPermits.release();
}
} catch (Exception e) {
logger.error("Exception in trimUnreferencedReaders", e);
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
+ remoteGenerationDeletionPermits.availablePermits()
+ " is not equal to "
+ REMOTE_DELETION_PERMITS;
}
}

Expand Down Expand Up @@ -441,7 +467,8 @@ protected static void deleteStaleRemotePrimaryTerms(
}
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFilesNotToBeDeleted.stream().map(file -> {
try {
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1();
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap, logger)
.v1();
} catch (IOException e) {
return Long.MIN_VALUE;
}
Expand Down Expand Up @@ -482,7 +509,8 @@ protected static Long getMinPrimaryTermInRemote(
protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
String metadataFile,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
Logger logger
) throws IOException {
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
if (minMaxPrimaryTermFromFileName != null) {
Expand All @@ -504,6 +532,8 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
if (primaryTerm.isPresent()) {
minPrimaryTem = primaryTerm.get();
}
} else {
logger.warn("No primary term found from GenerationToPrimaryTermMap for file [{}]", metadataFile);
}
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,14 @@ protected void trimUnreferencedReaders(boolean onlyTrimLocal) throws IOException
generationsToDelete.add(generation);
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGeneration(generationsToDelete);
try {
deleteRemoteGeneration(generationsToDelete);
} catch (Exception e) {
logger.error("Exception in delete generations flow", e);
// Release permit that is meant for metadata files and return
remoteGenerationDeletionPermits.release();
return;
}
translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release);
deleteStaleRemotePrimaryTerms();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,19 +496,24 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
List<String> translogFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFileName = Translog.getFilename(generation);
if (isTranslogMetadataEnabled == false) {
translogFiles.addAll(List.of(ckpFileName, translogFileName));
} else {
translogFiles.add(translogFileName);
}
});
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
try {
List<String> translogFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFileName = Translog.getFilename(generation);
if (isTranslogMetadataEnabled == false) {
translogFiles.addAll(List.of(ckpFileName, translogFileName));
} else {
translogFiles.add(translogFileName);
}
});
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
} catch (Exception e) {
onCompletion.run();
throw e;
}
}

/**
Expand Down Expand Up @@ -658,37 +663,32 @@ public void deleteTranslogFiles() throws IOException {
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
private void deleteTranslogFilesAsync(long primaryTerm, List<String> files, Runnable onCompletion) {
try {
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
files,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
fileTransferTracker.delete(files);
logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files);
onCompletion.run();
}
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
files,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
fileTransferTracker.delete(files);
logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files);
onCompletion.run();
}

@Override
public void onFailure(Exception e) {
onCompletion.run();
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTerm={} files={}",
primaryTerm,
files
),
e
);
}
@Override
public void onFailure(Exception e) {
onCompletion.run();
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTerm={} files={}",
primaryTerm,
files
),
e
);
}
);
} catch (Exception e) {
onCompletion.run();
throw e;
}
}
);
}

/**
Expand Down
Loading

0 comments on commit df52485

Please sign in to comment.