From c79b8fc1cd984e61000eca698b4e71e13171276c Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 3 Jun 2023 19:32:49 +0530 Subject: [PATCH] [Remote Store] Optimize segments metadata upload Signed-off-by: Ashish Singh Signed-off-by: Singh Signed-off-by: Ashish Singh --- .../shard/RemoteStoreRefreshListener.java | 120 +++++++++--------- 1 file changed, 58 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 88b71a92d7340..208efee0d0756 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -21,7 +21,6 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.action.bulk.BackoffPolicy; -import org.opensearch.common.CheckedFunction; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; @@ -112,6 +111,8 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final FileUploader fileUploader; + private final Set segmentFilesInLatestMetadata; + public RemoteStoreRefreshListener( IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher, @@ -152,11 +153,14 @@ public void onFailure(String file) { // Track upload failure segmentTracker.addUploadBytesFailed(latestFileNameSizeOnLocalMap.get(file)); } - }, remoteDirectory, storeDirectory, this::getChecksumOfLocalFile); + }, remoteDirectory, storeDirectory); + this.segmentFilesInLatestMetadata = new HashSet<>(); } @Override - public void beforeRefresh() throws IOException {} + public void beforeRefresh() throws IOException { + + } /** * Upload new segment files created as part of the last refresh to the remote segment store. @@ -230,23 +234,25 @@ private synchronized void syncSegments(boolean isRetry) { // Create a map of file name to size and update the refresh segment tracker updateLocalSizeMapAndTracker(localSegmentsPostRefresh); - // Start the segments files upload - boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh); - if (newSegmentsUploadStatus) { - segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); - localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); - - // Start metadata file upload - remoteDirectory.uploadMetadata( - localSegmentsPostRefresh, - storeDirectory, - indexShard.getOperationPrimaryTerm(), - segmentInfos.getGeneration() - ); - clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); - onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo); + List segmentFilesToUpload = localSegmentsPostRefresh.stream() + .filter(f -> skipUpload(f) == false) + .collect(Collectors.toList()); + + if (segmentFilesToUpload.isEmpty() || uploadNewSegments(segmentFilesToUpload)) { + if (segmentFilesInLatestMetadata.equals(localSegmentsPostRefresh) == false) { + segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); + localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); + // Start metadata file upload + remoteDirectory.uploadMetadata( + localSegmentsPostRefresh, + storeDirectory, + indexShard.getOperationPrimaryTerm(), + segmentInfos.getGeneration() + ); + clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); + onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo, checkpoint); + } indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); - checkpointPublisher.publish(indexShard, checkpoint); // At this point since we have uploaded new segments, segment infos and segment metadata file, // along with marking minSeqNoToKeep, upload has succeeded completely. shouldRetry = false; @@ -283,11 +289,7 @@ private synchronized void syncSegments(boolean isRetry) { * @param localSegmentsPostRefresh list of segment files present post refresh */ private void clearStaleFilesFromLocalSegmentChecksumMap(Collection localSegmentsPostRefresh) { - localSegmentChecksumMap.keySet() - .stream() - .filter(file -> !localSegmentsPostRefresh.contains(file)) - .collect(Collectors.toSet()) - .forEach(localSegmentChecksumMap::remove); + localSegmentChecksumMap.entrySet().removeIf(entry -> localSegmentsPostRefresh.contains(entry.getKey()) == false); } private void beforeSegmentsSync(boolean isRetry) { @@ -298,7 +300,8 @@ private void beforeSegmentsSync(boolean isRetry) { segmentTracker.incrementTotalUploadsStarted(); } - private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) { + private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo, ReplicationCheckpoint checkpoint) { + checkpointPublisher.publish(indexShard, checkpoint); // Update latest uploaded segment files name in segment tracker segmentTracker.setLatestUploadedFiles(latestFileNameSizeOnLocalMap.keySet()); // Update the remote refresh time and refresh seq no @@ -307,6 +310,9 @@ private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) { resetBackOffDelayIterator(); // Cancel the scheduled cancellable retry if possible and set it to null cancelAndResetScheduledCancellableRetry(); + // Keeping track of segment files in latest metadata + segmentFilesInLatestMetadata.clear(); + segmentFilesInLatestMetadata.addAll(latestFileNameSizeOnLocalMap.keySet()); } /** @@ -365,6 +371,25 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s return segmentInfoSnapshotFilename; } + /** + * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. + * + * @param file that needs to be uploaded. + * @return true if the upload has to be skipped for the file. + */ + private boolean skipUpload(String file) { + try { + // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. + return EXCLUDE_FILES.contains(file) || remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); + } catch (IOException e) { + logger.error( + "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", + file + ); + } + return false; + } + private boolean uploadNewSegments(Collection localSegmentsPostRefresh) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); localSegmentsPostRefresh.forEach(file -> { @@ -444,13 +469,14 @@ private void updateLocalSizeMapAndTracker(Collection segmentFiles) { private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { if (uploadStatus) { - long bytesUploaded = segmentTracker.getUploadBytesSucceeded() - bytesBeforeUpload; - long timeTakenInMS = (System.nanoTime() - startTimeInNS) / 1_000_000L; - segmentTracker.incrementTotalUploadsSucceeded(); - segmentTracker.addUploadBytes(bytesUploaded); - segmentTracker.addUploadBytesPerSec((bytesUploaded * 1_000L) / timeTakenInMS); - segmentTracker.addUploadTimeMs(timeTakenInMS); + long bytesUploaded = segmentTracker.getUploadBytesSucceeded() - bytesBeforeUpload; + if (bytesUploaded > 0) { + long timeTakenInMS = (System.nanoTime() - startTimeInNS) / 1_000_000L; + segmentTracker.addUploadBytes(bytesUploaded); + segmentTracker.addUploadBytesPerSec((bytesUploaded * 1_000L) / timeTakenInMS); + segmentTracker.addUploadTimeMs(timeTakenInMS); + } } else { segmentTracker.incrementTotalUploadsFailed(); } @@ -470,18 +496,10 @@ private static class FileUploader { private final Directory storeDirectory; - private final CheckedFunction checksumProvider; - - public FileUploader( - UploadTracker uploadTracker, - RemoteSegmentStoreDirectory remoteDirectory, - Directory storeDirectory, - CheckedFunction checksumProvider - ) { + public FileUploader(UploadTracker uploadTracker, RemoteSegmentStoreDirectory remoteDirectory, Directory storeDirectory) { this.uploadTracker = uploadTracker; this.remoteDirectory = remoteDirectory; this.storeDirectory = storeDirectory; - this.checksumProvider = checksumProvider; } /** @@ -492,9 +510,6 @@ public FileUploader( * @throws IOException is thrown if the upload fails. */ private void uploadFile(String file) throws IOException { - if (skipUpload(file)) { - return; - } uploadTracker.beforeUpload(file); boolean success = false; try { @@ -508,25 +523,6 @@ private void uploadFile(String file) throws IOException { } } - /** - * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. - * - * @param file that needs to be uploaded. - * @return true if the upload has to be skipped for the file. - */ - private boolean skipUpload(String file) { - try { - // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. - return EXCLUDE_FILES.contains(file) || remoteDirectory.containsFile(file, checksumProvider.apply(file)); - } catch (IOException e) { - logger.error( - "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", - file - ); - } - return false; - } - /** * This method does the actual upload. *