Skip to content

Commit

Permalink
[Remote Segment Store] Combine metadata and snapshot files (#7777)
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale authored Jun 6, 2023
1 parent aa21585 commit 8dd5461
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 114 deletions.
44 changes: 33 additions & 11 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
Expand All @@ -56,6 +57,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
Expand Down Expand Up @@ -159,6 +161,7 @@
import org.opensearch.index.store.Store.MetadataSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -195,7 +198,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -223,8 +228,8 @@
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;
import static org.opensearch.index.translog.Translog.Durability;
import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY;

/**
* An OpenSearch index shard
Expand Down Expand Up @@ -4477,7 +4482,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
// We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
// are uploaded to the remote segment store.
assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
((RemoteSegmentStoreDirectory) remoteDirectory).init();
RemoteSegmentMetadata remoteSegmentMetadata = ((RemoteSegmentStoreDirectory) remoteDirectory).init();
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory)
.getSegmentsUploadedToRemoteStore();
store.incRef();
Expand All @@ -4499,7 +4504,6 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
} else {
storeDirectory = store.directory();
}
String segmentInfosSnapshotFilename = null;
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
Expand All @@ -4509,26 +4513,44 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
}
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
downloadedSegments.add(file);
if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) {
assert segmentInfosSnapshotFilename == null : "There should be only one SegmentInfosSnapshot file";
segmentInfosSnapshotFilename = file;
}
} else {
skippedSegments.add(file);
}
}
if (refreshLevelSegmentSync && segmentInfosSnapshotFilename != null) {

if (refreshLevelSegmentSync && remoteSegmentMetadata != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT)
)
new ByteArrayIndexInput("Snapshot of SegmentInfos", remoteSegmentMetadata.getSegmentInfosBytes())
);
) {
SegmentInfos infosSnapshot = SegmentInfos.readCommit(
store.directory(),
indexInput,
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
// commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get()) - 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.concurrent.GatedCloseable;
Expand All @@ -37,7 +36,6 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -85,7 +83,6 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
static final int LAST_N_METADATA_FILES_TO_KEEP = 10;
static final String SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX = "segment_infos_snapshot_filename";

private final IndexShard indexShard;
private final Directory storeDirectory;
Expand Down Expand Up @@ -201,7 +198,6 @@ private synchronized void syncSegments(boolean isRetry) {
deleteStaleCommits();
}

String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
Expand Down Expand Up @@ -233,16 +229,8 @@ private synchronized void syncSegments(boolean isRetry) {
// Start the segments files upload
boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh);
if (newSegmentsUploadStatus) {
segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos);
localSegmentsPostRefresh.add(segmentInfoSnapshotFilename);

// Start metadata file upload
remoteDirectory.uploadMetadata(
localSegmentsPostRefresh,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
segmentInfos.getGeneration()
);
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo);
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
Expand All @@ -254,14 +242,6 @@ private synchronized void syncSegments(boolean isRetry) {
}
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
} finally {
try {
if (segmentInfoSnapshotFilename != null) {
storeDirectory.deleteFile(segmentInfoSnapshotFilename);
}
} catch (IOException e) {
logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e);
}
}
} catch (IOException e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
Expand Down Expand Up @@ -347,22 +327,21 @@ private boolean isRefreshAfterCommit() throws IOException {
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException {
final long maxSeqNoFromSegmentInfos = indexShard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfosSnapshot);
void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos) throws IOException {
final long maxSeqNoFromSegmentInfos = indexShard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos);

SegmentInfos segmentInfosSnapshot = segmentInfos.clone();
Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNoFromSegmentInfos));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNoFromSegmentInfos));
segmentInfosSnapshot.setUserData(userData, false);

long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename);
String segmentInfoSnapshotFilename = SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + commitGeneration;
try (IndexOutput indexOutput = storeDirectory.createOutput(segmentInfoSnapshotFilename, IOContext.DEFAULT)) {
segmentInfosSnapshot.write(indexOutput);
}
storeDirectory.sync(Collections.singleton(segmentInfoSnapshotFilename));
remoteDirectory.copyFrom(storeDirectory, segmentInfoSnapshotFilename, segmentInfoSnapshotFilename, IOContext.DEFAULT, true);
return segmentInfoSnapshotFilename;
remoteDirectory.uploadMetadata(
localSegmentsPostRefresh,
segmentInfosSnapshot,
storeDirectory,
indexShard.getOperationPrimaryTerm()
);
}

private boolean uploadNewSegments(Collection<String> localSegmentsPostRefresh) throws IOException {
Expand Down
Loading

0 comments on commit 8dd5461

Please sign in to comment.