Skip to content

Commit

Permalink
[Remote Store] Optimize segments metadata upload
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
Signed-off-by: Singh <[email protected]>
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Jun 5, 2023
1 parent 5d5e8ad commit c79b8fc
Showing 1 changed file with 58 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -112,6 +111,8 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres

private final FileUploader fileUploader;

private final Set<String> segmentFilesInLatestMetadata;

public RemoteStoreRefreshListener(
IndexShard indexShard,
SegmentReplicationCheckpointPublisher checkpointPublisher,
Expand Down Expand Up @@ -152,11 +153,14 @@ public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(latestFileNameSizeOnLocalMap.get(file));
}
}, remoteDirectory, storeDirectory, this::getChecksumOfLocalFile);
}, remoteDirectory, storeDirectory);
this.segmentFilesInLatestMetadata = new HashSet<>();
}

@Override
public void beforeRefresh() throws IOException {}
public void beforeRefresh() throws IOException {

}

/**
* Upload new segment files created as part of the last refresh to the remote segment store.
Expand Down Expand Up @@ -230,23 +234,25 @@ private synchronized void syncSegments(boolean isRetry) {
// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);

// Start the segments files upload
boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh);
if (newSegmentsUploadStatus) {
segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos);
localSegmentsPostRefresh.add(segmentInfoSnapshotFilename);

// Start metadata file upload
remoteDirectory.uploadMetadata(
localSegmentsPostRefresh,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
segmentInfos.getGeneration()
);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo);
List<String> segmentFilesToUpload = localSegmentsPostRefresh.stream()
.filter(f -> skipUpload(f) == false)
.collect(Collectors.toList());

if (segmentFilesToUpload.isEmpty() || uploadNewSegments(segmentFilesToUpload)) {
if (segmentFilesInLatestMetadata.equals(localSegmentsPostRefresh) == false) {
segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos);
localSegmentsPostRefresh.add(segmentInfoSnapshotFilename);
// Start metadata file upload
remoteDirectory.uploadMetadata(
localSegmentsPostRefresh,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
segmentInfos.getGeneration()
);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo, checkpoint);
}
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
checkpointPublisher.publish(indexShard, checkpoint);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
shouldRetry = false;
Expand Down Expand Up @@ -283,11 +289,7 @@ private synchronized void syncSegments(boolean isRetry) {
* @param localSegmentsPostRefresh list of segment files present post refresh
*/
private void clearStaleFilesFromLocalSegmentChecksumMap(Collection<String> localSegmentsPostRefresh) {
localSegmentChecksumMap.keySet()
.stream()
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
localSegmentChecksumMap.entrySet().removeIf(entry -> localSegmentsPostRefresh.contains(entry.getKey()) == false);
}

private void beforeSegmentsSync(boolean isRetry) {
Expand All @@ -298,7 +300,8 @@ private void beforeSegmentsSync(boolean isRetry) {
segmentTracker.incrementTotalUploadsStarted();
}

private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) {
private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo, ReplicationCheckpoint checkpoint) {
checkpointPublisher.publish(indexShard, checkpoint);
// Update latest uploaded segment files name in segment tracker
segmentTracker.setLatestUploadedFiles(latestFileNameSizeOnLocalMap.keySet());
// Update the remote refresh time and refresh seq no
Expand All @@ -307,6 +310,9 @@ private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) {
resetBackOffDelayIterator();
// Cancel the scheduled cancellable retry if possible and set it to null
cancelAndResetScheduledCancellableRetry();
// Keeping track of segment files in latest metadata
segmentFilesInLatestMetadata.clear();
segmentFilesInLatestMetadata.addAll(latestFileNameSizeOnLocalMap.keySet());
}

/**
Expand Down Expand Up @@ -365,6 +371,25 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s
return segmentInfoSnapshotFilename;
}

/**
* Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded.
*
* @param file that needs to be uploaded.
* @return true if the upload has to be skipped for the file.
*/
private boolean skipUpload(String file) {
try {
// Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded.
return EXCLUDE_FILES.contains(file) || remoteDirectory.containsFile(file, getChecksumOfLocalFile(file));
} catch (IOException e) {
logger.error(
"Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file",
file
);
}
return false;
}

private boolean uploadNewSegments(Collection<String> localSegmentsPostRefresh) throws IOException {
AtomicBoolean uploadSuccess = new AtomicBoolean(true);
localSegmentsPostRefresh.forEach(file -> {
Expand Down Expand Up @@ -444,13 +469,14 @@ private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {

private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) {
if (uploadStatus) {
long bytesUploaded = segmentTracker.getUploadBytesSucceeded() - bytesBeforeUpload;
long timeTakenInMS = (System.nanoTime() - startTimeInNS) / 1_000_000L;

segmentTracker.incrementTotalUploadsSucceeded();
segmentTracker.addUploadBytes(bytesUploaded);
segmentTracker.addUploadBytesPerSec((bytesUploaded * 1_000L) / timeTakenInMS);
segmentTracker.addUploadTimeMs(timeTakenInMS);
long bytesUploaded = segmentTracker.getUploadBytesSucceeded() - bytesBeforeUpload;
if (bytesUploaded > 0) {
long timeTakenInMS = (System.nanoTime() - startTimeInNS) / 1_000_000L;
segmentTracker.addUploadBytes(bytesUploaded);
segmentTracker.addUploadBytesPerSec((bytesUploaded * 1_000L) / timeTakenInMS);
segmentTracker.addUploadTimeMs(timeTakenInMS);
}
} else {
segmentTracker.incrementTotalUploadsFailed();
}
Expand All @@ -470,18 +496,10 @@ private static class FileUploader {

private final Directory storeDirectory;

private final CheckedFunction<String, String, IOException> checksumProvider;

public FileUploader(
UploadTracker uploadTracker,
RemoteSegmentStoreDirectory remoteDirectory,
Directory storeDirectory,
CheckedFunction<String, String, IOException> checksumProvider
) {
public FileUploader(UploadTracker uploadTracker, RemoteSegmentStoreDirectory remoteDirectory, Directory storeDirectory) {
this.uploadTracker = uploadTracker;
this.remoteDirectory = remoteDirectory;
this.storeDirectory = storeDirectory;
this.checksumProvider = checksumProvider;
}

/**
Expand All @@ -492,9 +510,6 @@ public FileUploader(
* @throws IOException is thrown if the upload fails.
*/
private void uploadFile(String file) throws IOException {
if (skipUpload(file)) {
return;
}
uploadTracker.beforeUpload(file);
boolean success = false;
try {
Expand All @@ -508,25 +523,6 @@ private void uploadFile(String file) throws IOException {
}
}

/**
* Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded.
*
* @param file that needs to be uploaded.
* @return true if the upload has to be skipped for the file.
*/
private boolean skipUpload(String file) {
try {
// Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded.
return EXCLUDE_FILES.contains(file) || remoteDirectory.containsFile(file, checksumProvider.apply(file));
} catch (IOException e) {
logger.error(
"Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file",
file
);
}
return false;
}

/**
* This method does the actual upload.
*
Expand Down

0 comments on commit c79b8fc

Please sign in to comment.