Skip to content

Commit

Permalink
Add async segment file download support from remote store
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Sep 1, 2023
1 parent ff4b23b commit b1cfad2
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 30 deletions.
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public synchronized IndexShard createShard(
Store remoteStore = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Expand All @@ -488,7 +488,8 @@ public synchronized IndexShard createShard(
this.indexSettings,
directory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)),
path
);
eventListener.onStoreCreated(shardId);
indexShard = new IndexShard(
Expand Down
90 changes: 78 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.replication.PendingReplicationActions;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.metadata.DataStream;
Expand Down Expand Up @@ -196,6 +198,7 @@
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -212,6 +215,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -4806,39 +4810,101 @@ private String copySegmentFiles(
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments,
boolean overrideLocal
) throws IOException {
List<String> downloadedSegments = new ArrayList<>();
List<String> skippedSegments = new ArrayList<>();
Set<String> toDownloadSegments = new HashSet<>();
Set<String> skippedSegments = new HashSet<>();
String segmentNFile = null;

try {
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
if (overrideLocal) {
for (String file : localSegmentFiles) {
storeDirectory.deleteFile(file);
}
deleteExistingSegments(storeDirectory);
}

for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
storeDirectory.copyFrom(sourceRemoteDirectory, file, file, IOContext.DEFAULT);
downloadedSegments.add(file);
toDownloadSegments.add(file);
} else {
skippedSegments.add(file);
}
if (targetRemoteDirectory != null) {
targetRemoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
}

if (file.startsWith(IndexFileNames.SEGMENTS)) {
assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file";
segmentNFile = file;
}
}

if (!toDownloadSegments.isEmpty()) {
try {
CountDownLatch completionLatch = new CountDownLatch(1);
LatchedActionListener<Void> downloadsCompletionListener = new LatchedActionListener<>(
new PlainActionFuture<>(),
completionLatch
);

downloadSegments(
storeDirectory,
sourceRemoteDirectory,
targetRemoteDirectory,
toDownloadSegments,
downloadsCompletionListener
);

completionLatch.await();
} catch (Exception e) {
throw new IOException("Error occurred when downloading segments from remote store", e);

Check warning on line 4854 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L4853-L4854

Added lines #L4853 - L4854 were not covered by tests
}
}
} finally {
logger.trace("Downloaded segments here: {}", downloadedSegments);
logger.trace("Downloaded segments here: {}}", toDownloadSegments);
logger.trace("Skipped download for segments here: {}", skippedSegments);
}

return segmentNFile;
}

private void downloadSegments(
Directory storeDirectory,
RemoteSegmentStoreDirectory sourceRemoteDirectory,
RemoteSegmentStoreDirectory targetRemoteDirectory,
Set<String> toDownloadSegments,
ActionListener<Void> completionListener
) {
final AtomicInteger totalDownloadedSegments = new AtomicInteger(toDownloadSegments.size());
final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex();

final ActionListener<String> segmentsDownloadListener = new ActionListener<>() {
@Override
public void onResponse(String fileName) {
try {
if (targetRemoteDirectory != null) {
targetRemoteDirectory.copyFrom(storeDirectory, fileName, fileName, IOContext.DEFAULT);
}
if (totalDownloadedSegments.decrementAndGet() == 0) {
completionListener.onResponse(null);
}
} catch (IOException ex) {
logger.error("Failed to download segment file {} from the remote store", fileName);
onFailure(ex);

Check warning on line 4887 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L4885-L4887

Added lines #L4885 - L4887 were not covered by tests
}
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to download one of the segment files from the remote store", e);
completionListener.onFailure(e);
}

Check warning on line 4895 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L4893-L4895

Added lines #L4893 - L4895 were not covered by tests
};

toDownloadSegments.forEach(file -> sourceRemoteDirectory.copyTo(storeDirectory, file, indexPath, segmentsDownloadListener));
}

private void deleteExistingSegments(Directory storeDirectory) throws IOException {
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
for (String file : localSegmentFiles) {
storeDirectory.deleteFile(file);
}

Check warning on line 4905 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L4904-L4905

Added lines #L4904 - L4905 were not covered by tests
}

private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) {
try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) {
if (checksum == CodecUtil.retrieveChecksum(indexInput)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Version;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
Expand All @@ -39,6 +40,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -434,6 +436,30 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen
}
}

/**
* Copies the segment from the remote directory to a local directory, preferably using an async mechanism using multi stream support.
* @param storeDirectory The directory where the segment needs to be copied to
* @param src The name of the segment file
* @param segmentDirectoryPath The file path for the segment directory
* @param segmentCompletionListener Async listener which should be notified once the segment is downloaded or in case of failure
*/
public void copyTo(Directory storeDirectory, String src, Path segmentDirectoryPath, ActionListener<String> segmentCompletionListener) {
final String blobName = getExistingRemoteFilename(src);
if (segmentDirectoryPath != null && remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer) {
VerifyingMultiStreamBlobContainer blobContainer = (VerifyingMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer();
Path destinationPath = segmentDirectoryPath.resolve(src);
blobContainer.asyncBlobDownload(blobName, destinationPath, threadPool, segmentCompletionListener);
} else {
// Fallback to older mechanism of downloading the file
try {
storeDirectory.copyFrom(this, src, src, IOContext.DEFAULT);
segmentCompletionListener.onResponse(src);
} catch (IOException e) {
segmentCompletionListener.onFailure(e);
}
}
}

/**
* This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo}
*
Expand Down
20 changes: 18 additions & 2 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.lucene.util.Version;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.Lucene;
Expand All @@ -92,6 +93,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.AbstractIndexShardComponent;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.translog.Translog;

import java.io.Closeable;
Expand Down Expand Up @@ -179,6 +181,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
private final ShardLock shardLock;
private final OnClose onClose;
private final ShardPath shardPath;

// used to ref count files when a new Reader is opened for PIT/Scroll queries
// prevents segment files deletion until the PIT/Scroll expires or is discarded
Expand All @@ -192,17 +195,25 @@ protected void closeInternal() {
};

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) {
this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY);
this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY, null);
}

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock, OnClose onClose) {
public Store(
ShardId shardId,
IndexSettings indexSettings,
Directory directory,
ShardLock shardLock,
OnClose onClose,
ShardPath shardPath
) {
super(shardId, indexSettings);
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
this.shardLock = shardLock;
this.onClose = onClose;
this.shardPath = shardPath;
assert onClose != null;
assert shardLock != null;
assert shardLock.getShardId().equals(shardId);
Expand All @@ -213,6 +224,11 @@ public Directory directory() {
return directory;
}

@InternalApi
public ShardPath shardPath() {
return shardPath;
}

/**
* Returns the last committed segments info for this store
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,27 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -106,31 +110,69 @@ public void getSegmentFiles(
logger.trace("Downloading segments files from remote store {}", filesToFetch);

RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile();
List<StoreFileMetadata> downloadedSegments = new ArrayList<>();
List<StoreFileMetadata> toDownloadSegments = new ArrayList<>();
Collection<String> directoryFiles = List.of(indexShard.store().directory().listAll());
if (remoteSegmentMetadata != null) {
try {
indexShard.store().incRef();
indexShard.remoteStore().incRef();
final Directory storeDirectory = indexShard.store().directory();
final ShardPath shardPath = indexShard.shardPath();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
downloadedSegments.add(fileMetadata);
toDownloadSegments.add(fileMetadata);
}
logger.trace("Downloaded segments from remote store {}", downloadedSegments);

CountDownLatch countDownLatch = new CountDownLatch(1);
LatchedActionListener<GetSegmentFilesResponse> completionListener = new LatchedActionListener<>(
listener,
countDownLatch
);
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, completionListener);
countDownLatch.await();

logger.trace("Downloaded segments from remote store {}", toDownloadSegments);
} finally {
indexShard.store().decRef();
indexShard.remoteStore().decRef();
}
}
listener.onResponse(new GetSegmentFilesResponse(downloadedSegments));
} catch (Exception e) {
listener.onFailure(e);
}
}

private void downloadSegments(
Directory storeDirectory,
RemoteSegmentStoreDirectory remoteStoreDirectory,
List<StoreFileMetadata> toDownloadSegments,
ShardPath shardPath,
ActionListener<GetSegmentFilesResponse> completionListener
) {

final AtomicInteger totalDownloadedSegments = new AtomicInteger(toDownloadSegments.size());
final Path indexPath = shardPath == null ? null : shardPath.resolveIndex();
final ActionListener<String> segmentsDownloadListener = new ActionListener<>() {
@Override
public void onResponse(String fileName) {
if (totalDownloadedSegments.decrementAndGet() == 0) {
completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments));
}
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to download one of the segment files from the remote store", e);
completionListener.onFailure(e);
}

Check warning on line 168 in server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java#L166-L168

Added lines #L166 - L168 were not covered by tests
};

toDownloadSegments.forEach(
fileMetadata -> remoteStoreDirectory.copyTo(storeDirectory, fileMetadata.name(), indexPath, segmentsDownloadListener)
);
}

@Override
public String getDescription() {
return "RemoteStoreReplicationSource";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,7 @@ public Set<String> getPendingDeletions() throws IOException {
}
};

try (Store store = createStore(shardId, new IndexSettings(metadata, Settings.EMPTY), directory)) {
try (Store store = createStore(shardId, new IndexSettings(metadata, Settings.EMPTY), directory, shardPath)) {
IndexShard shard = newShard(
shardRouting,
shardPath,
Expand Down
Loading

0 comments on commit b1cfad2

Please sign in to comment.