From 31ea3ec5e969658c099e05bce6d4b545d97fbc09 Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Fri, 1 Sep 2023 15:53:43 -0700 Subject: [PATCH] Add async segment file download support from remote store Signed-off-by: Kunal Kotwani --- .../multipart/RemoteStoreMultipartIT.java | 10 ++ .../common/settings/FeatureFlagSettings.java | 1 + .../opensearch/common/util/FeatureFlags.java | 11 ++ .../org/opensearch/index/IndexService.java | 5 +- .../opensearch/index/shard/IndexShard.java | 79 ++++++++++-- .../store/RemoteSegmentStoreDirectory.java | 40 ++++++ .../org/opensearch/index/store/Store.java | 20 ++- .../RemoteStoreReplicationSource.java | 54 ++++++++- .../index/shard/IndexShardTests.java | 2 +- .../RemoteSegmentStoreDirectoryTests.java | 114 ++++++++++++++++++ .../opensearch/index/store/StoreTests.java | 23 +++- .../index/shard/IndexShardTestCase.java | 13 +- 12 files changed, 343 insertions(+), 29 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java index 21f48ba99e651..6241cd51b6488 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java @@ -13,6 +13,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.plugins.Plugin; import org.opensearch.remotestore.RemoteStoreIT; @@ -86,6 +87,15 @@ public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String nam } else { return super.buildRepositoryMetadata(node, name); } + + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE_EXPERIMENTAL_SETTING.getKey(), Boolean.TRUE.toString()) + .build(); } public void testRateLimitedRemoteUploads() throws Exception { diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 90abc0a0765c1..307de6ec3fe93 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -36,6 +36,7 @@ protected FeatureFlagSettings( new HashSet<>( Arrays.asList( FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL_SETTING, + FeatureFlags.REMOTE_STORE_EXPERIMENTAL_SETTING, FeatureFlags.EXTENSIONS_SETTING, FeatureFlags.IDENTITY_SETTING, FeatureFlags.CONCURRENT_SEGMENT_SEARCH_SETTING, diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index b89d2d0549823..f0bbba2c500c1 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -26,6 +26,11 @@ public class FeatureFlags { public static final String SEGMENT_REPLICATION_EXPERIMENTAL = "opensearch.experimental.feature.segment_replication_experimental.enabled"; + /** + * Gates the visibility of the remote store experimental features that allows users to test unreleased beta features. + */ + public static final String REMOTE_STORE_EXPERIMENTAL = "opensearch.experimental.feature.remote_store_experimental.enabled"; + /** * Gates the ability for Searchable Snapshots to read snapshots that are older than the * guaranteed backward compatibility for OpenSearch (one prior major version) on a best effort basis. @@ -89,6 +94,12 @@ public static boolean isEnabled(String featureFlagName) { Property.NodeScope ); + public static final Setting REMOTE_STORE_EXPERIMENTAL_SETTING = Setting.boolSetting( + REMOTE_STORE_EXPERIMENTAL, + false, + Property.NodeScope + ); + public static final Setting EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope); public static final Setting IDENTITY_SETTING = Setting.boolSetting(IDENTITY, false, Property.NodeScope); diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 80ead0a333ba3..20095df355d0a 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -479,7 +479,7 @@ public synchronized IndexShard createShard( Store remoteStore = null; if (this.indexSettings.isRemoteStoreEnabled()) { Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path); - remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY); + remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path); } Directory directory = directoryFactory.newDirectory(this.indexSettings, path); @@ -488,7 +488,8 @@ public synchronized IndexShard createShard( this.indexSettings, directory, lock, - new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)) + new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)), + path ); eventListener.onStoreCreated(shardId); indexShard = new IndexShard( diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8ed75330f938e..64a3ecb8da3d8 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -96,7 +96,6 @@ import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.io.IOUtils; -import org.opensearch.common.util.set.Sets; import org.opensearch.core.Assertions; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.unit.ByteSizeValue; @@ -198,6 +197,7 @@ import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -4833,39 +4833,96 @@ private String copySegmentFiles( Map uploadedSegments, boolean overrideLocal ) throws IOException { - List downloadedSegments = new ArrayList<>(); - List skippedSegments = new ArrayList<>(); + Set toDownloadSegments = new HashSet<>(); + Set skippedSegments = new HashSet<>(); String segmentNFile = null; + try { - Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); if (overrideLocal) { - for (String file : localSegmentFiles) { + for (String file : storeDirectory.listAll()) { storeDirectory.deleteFile(file); } } + for (String file : uploadedSegments.keySet()) { long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { - storeDirectory.copyFrom(sourceRemoteDirectory, file, file, IOContext.DEFAULT); - downloadedSegments.add(file); + toDownloadSegments.add(file); } else { skippedSegments.add(file); } - if (targetRemoteDirectory != null) { - targetRemoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); - } + if (file.startsWith(IndexFileNames.SEGMENTS)) { assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file"; segmentNFile = file; } } + + if (toDownloadSegments.isEmpty() == false) { + try { + final CountDownLatch completionLatch = new CountDownLatch(toDownloadSegments.size()); + final AtomicBoolean anyFileFailed = new AtomicBoolean(); + + downloadSegments( + storeDirectory, + sourceRemoteDirectory, + targetRemoteDirectory, + toDownloadSegments, + completionLatch, + anyFileFailed + ); + + completionLatch.await(); + if (anyFileFailed.get()) { + throw new IOException("Error occurred in one or more segment file downloads"); + } + } catch (Exception e) { + throw new IOException("Error occurred when downloading segments from remote store", e); + } + } } finally { - logger.trace("Downloaded segments here: {}", downloadedSegments); + logger.trace("Downloaded segments here: {}", toDownloadSegments); logger.trace("Skipped download for segments here: {}", skippedSegments); } + return segmentNFile; } + private void downloadSegments( + Directory storeDirectory, + RemoteSegmentStoreDirectory sourceRemoteDirectory, + RemoteSegmentStoreDirectory targetRemoteDirectory, + Set toDownloadSegments, + CountDownLatch completionLatch, + AtomicBoolean anyFileFailed + ) { + final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex(); + + final ActionListener segmentsDownloadListener = new ActionListener<>() { + @Override + public void onResponse(String fileName) { + try { + if (targetRemoteDirectory != null) { + targetRemoteDirectory.copyFrom(storeDirectory, fileName, fileName, IOContext.DEFAULT); + } + completionLatch.countDown(); + } catch (IOException ex) { + logger.error("Failed to download segment file {} from the remote store", fileName); + onFailure(ex); + } + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to download one of the segment files from the remote store", e); + anyFileFailed.set(true); + completionLatch.countDown(); + } + }; + + toDownloadSegments.forEach(file -> sourceRemoteDirectory.copyTo(file, storeDirectory, indexPath, segmentsDownloadListener)); + } + private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { if (checksum == CodecUtil.retrieveChecksum(indexInput)) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 266552cea93d9..d5d4c16318a32 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -24,8 +24,10 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Version; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.action.ActionListener; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.store.lockmanager.FileLockInfo; @@ -40,6 +42,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -434,6 +437,43 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } } + /** + * Copies an existing {@code source} file from this directory to a non-existent file (also + * named {@code source}) in either {@code destinationDirectory} or {@code destinationPath}. + * If the blob container backing this directory supports multipart downloads, the {@code source} + * file will be downloaded (potentially in multiple concurrent parts) directly to + * {@code destinationPath}. This method will return immediately and {@code fileCompletionListener} + * will be notified upon completion. + *

+ * If multipart downloads are not supported, then {@code source} file will be copied to a file named + * {@code source} in a single part to {@code destinationDirectory}. The download will happen on the + * calling thread and {@code fileCompletionListener} will be notified synchronously before this + * method returns. + * + * @param source The source file name + * @param destinationDirectory The destination directory (if multipart is not supported) + * @param destinationPath The destination path (if multipart is supported) + * @param fileCompletionListener The listener to notify of completion + */ + public void copyTo(String source, Directory destinationDirectory, Path destinationPath, ActionListener fileCompletionListener) { + final String blobName = getExistingRemoteFilename(source); + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE_EXPERIMENTAL) + && destinationPath != null + && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { + final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer(); + final Path destinationFilePath = destinationPath.resolve(source); + blobContainer.asyncBlobDownload(blobName, destinationFilePath, threadPool, fileCompletionListener); + } else { + // Fallback to older mechanism of downloading the file + try { + destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT); + fileCompletionListener.onResponse(source); + } catch (IOException e) { + fileCompletionListener.onFailure(e); + } + } + } + /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} * diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index b3ea2cdd02e21..285241ba89996 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -66,6 +66,7 @@ import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; import org.opensearch.common.UUIDs; +import org.opensearch.common.annotation.InternalApi; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.Lucene; @@ -92,6 +93,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.AbstractIndexShardComponent; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardPath; import org.opensearch.index.translog.Translog; import java.io.Closeable; @@ -179,6 +181,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock(); private final ShardLock shardLock; private final OnClose onClose; + private final ShardPath shardPath; // used to ref count files when a new Reader is opened for PIT/Scroll queries // prevents segment files deletion until the PIT/Scroll expires or is discarded @@ -192,10 +195,17 @@ protected void closeInternal() { }; public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) { - this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY); + this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY, null); } - public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock, OnClose onClose) { + public Store( + ShardId shardId, + IndexSettings indexSettings, + Directory directory, + ShardLock shardLock, + OnClose onClose, + ShardPath shardPath + ) { super(shardId, indexSettings); final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING); logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval); @@ -203,6 +213,7 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId)); this.shardLock = shardLock; this.onClose = onClose; + this.shardPath = shardPath; assert onClose != null; assert shardLock != null; assert shardLock.getShardId().equals(shardId); @@ -213,6 +224,11 @@ public Directory directory() { return directory; } + @InternalApi + public ShardPath shardPath() { + return shardPath; + } + /** * Returns the last committed segments info for this store * diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 7252fea044a02..8dde7d7be40fd 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -13,23 +13,27 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Version; +import org.opensearch.action.LatchedActionListener; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; +import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -106,31 +110,69 @@ public void getSegmentFiles( logger.trace("Downloading segments files from remote store {}", filesToFetch); RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile(); - List downloadedSegments = new ArrayList<>(); + List toDownloadSegments = new ArrayList<>(); Collection directoryFiles = List.of(indexShard.store().directory().listAll()); if (remoteSegmentMetadata != null) { try { indexShard.store().incRef(); indexShard.remoteStore().incRef(); final Directory storeDirectory = indexShard.store().directory(); + final ShardPath shardPath = indexShard.shardPath(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; - storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); - downloadedSegments.add(fileMetadata); + toDownloadSegments.add(fileMetadata); } - logger.trace("Downloaded segments from remote store {}", downloadedSegments); + + CountDownLatch countDownLatch = new CountDownLatch(1); + LatchedActionListener completionListener = new LatchedActionListener<>( + listener, + countDownLatch + ); + downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, completionListener); + countDownLatch.await(); + + logger.trace("Downloaded segments from remote store {}", toDownloadSegments); } finally { indexShard.store().decRef(); indexShard.remoteStore().decRef(); } } - listener.onResponse(new GetSegmentFilesResponse(downloadedSegments)); } catch (Exception e) { listener.onFailure(e); } } + private void downloadSegments( + Directory storeDirectory, + RemoteSegmentStoreDirectory remoteStoreDirectory, + List toDownloadSegments, + ShardPath shardPath, + ActionListener completionListener + ) { + + final AtomicInteger totalDownloadedSegments = new AtomicInteger(toDownloadSegments.size()); + final Path indexPath = shardPath == null ? null : shardPath.resolveIndex(); + final ActionListener segmentsDownloadListener = new ActionListener<>() { + @Override + public void onResponse(String fileName) { + if (totalDownloadedSegments.decrementAndGet() == 0) { + completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments)); + } + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to download one of the segment files from the remote store", e); + completionListener.onFailure(e); + } + }; + + toDownloadSegments.forEach( + fileMetadata -> remoteStoreDirectory.copyTo(fileMetadata.name(), storeDirectory, indexPath, segmentsDownloadListener) + ); + } + @Override public String getDescription() { return "RemoteStoreReplicationSource"; diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index cd272d8f626d0..b2eb41828a4df 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -1780,7 +1780,7 @@ public Set getPendingDeletions() throws IOException { } }; - try (Store store = createStore(shardId, new IndexSettings(metadata, Settings.EMPTY), directory)) { + try (Store store = createStore(shardId, new IndexSettings(metadata, Settings.EMPTY), directory, shardPath)) { IndexShard shard = newShard( shardRouting, shardPath, diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 2948528ad82e1..07ee137c1491d 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -28,6 +28,7 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; @@ -46,6 +47,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -61,6 +63,8 @@ import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; import static org.hamcrest.CoreMatchers.is; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -516,6 +520,116 @@ public void onFailure(Exception e) {} storeDirectory.close(); } + public void testCopyFilesToMultipart() throws Exception { + Settings settings = Settings.builder() + .put(FeatureFlags.REMOTE_STORE_EXPERIMENTAL_SETTING.getKey(), Boolean.TRUE.toString()) + .build(); + FeatureFlags.initializeFeatureFlags(settings); + + String filename = "_0.cfe"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = mock(Directory.class); + AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + + Mockito.doAnswer(invocation -> { + ActionListener completionListener = invocation.getArgument(3); + completionListener.onResponse(invocation.getArgument(0)); + return null; + }).when(blobContainer).asyncBlobDownload(any(), any(), any(), any()); + + CountDownLatch downloadLatch = new CountDownLatch(1); + ActionListener completionListener = new ActionListener() { + @Override + public void onResponse(String unused) { + downloadLatch.countDown(); + } + + @Override + public void onFailure(Exception e) {} + }; + Path path = createTempDir(); + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener); + assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS)); + verify(blobContainer, times(1)).asyncBlobDownload(contains(filename), eq(path.resolve(filename)), any(), any()); + verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any()); + } + + public void testCopyFilesTo() throws Exception { + String filename = "_0.cfe"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = mock(Directory.class); + CountDownLatch downloadLatch = new CountDownLatch(1); + ActionListener completionListener = new ActionListener<>() { + @Override + public void onResponse(String unused) { + downloadLatch.countDown(); + } + + @Override + public void onFailure(Exception e) {} + }; + Path path = createTempDir(); + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener); + assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); + verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); + } + + public void testCopyFilesToEmptyPath() throws Exception { + String filename = "_0.cfe"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = mock(Directory.class); + AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + + CountDownLatch downloadLatch = new CountDownLatch(1); + ActionListener completionListener = new ActionListener<>() { + @Override + public void onResponse(String unused) { + downloadLatch.countDown(); + } + + @Override + public void onFailure(Exception e) {} + }; + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, completionListener); + assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); + verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); + } + + public void testCopyFilesToException() throws Exception { + String filename = "_0.cfe"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = mock(Directory.class); + Mockito.doThrow(new IOException()) + .when(storeDirectory) + .copyFrom(any(Directory.class), anyString(), anyString(), any(IOContext.class)); + CountDownLatch downloadLatch = new CountDownLatch(1); + ActionListener completionListener = new ActionListener<>() { + @Override + public void onResponse(String unused) { + + } + + @Override + public void onFailure(Exception e) { + downloadLatch.countDown(); + } + }; + Path path = createTempDir(); + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener); + assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); + verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); + } + public void testCopyFilesFromMultipartIOException() throws Exception { String filename = "_100.si"; AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 8395b3e8ac08e..d7d326b325cc6 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -84,6 +84,7 @@ import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.ShardPath; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; @@ -798,7 +799,7 @@ public void testOnCloseCallback() throws IOException { assertEquals(shardId, theLock.getShardId()); assertEquals(lock, theLock); count.incrementAndGet(); - }); + }, null); assertEquals(count.get(), 0); final int iters = randomIntBetween(1, 10); @@ -809,6 +810,26 @@ public void testOnCloseCallback() throws IOException { assertEquals(count.get(), 1); } + public void testStoreShardPath() { + final ShardId shardId = new ShardId("index", "_na_", 1); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(0)) + .build(); + final Path path = createTempDir().resolve(shardId.getIndex().getUUID()).resolve(String.valueOf(shardId.id())); + final ShardPath shardPath = new ShardPath(false, path, path, shardId); + final Store store = new Store( + shardId, + IndexSettingsModule.newIndexSettings("index", settings), + StoreTests.newDirectory(random()), + new DummyShardLock(shardId), + Store.OnClose.EMPTY, + shardPath + ); + assertEquals(shardPath, store.shardPath()); + store.close(); + } + public void testStoreStats() throws IOException { final ShardId shardId = new ShardId("index", "_na_", 1); Settings settings = Settings.builder() diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 43f2cce668e81..7a54c32248218 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -272,11 +272,11 @@ public Settings threadPoolSettings() { } protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { - return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex())); + return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex()), shardPath); } - protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException { - return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); + protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardPath shardPath) throws IOException { + return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId), Store.OnClose.EMPTY, shardPath); } protected Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { @@ -654,7 +654,7 @@ protected IndexShard newShard( remotePath = createTempDir(); } - remoteStore = createRemoteStore(remotePath, routing, indexMetadata); + remoteStore = createRemoteStore(remotePath, routing, indexMetadata, shardPath); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, indexSettings.getSettings()); BlobStoreRepository repo = createRepository(remotePath); @@ -768,11 +768,12 @@ protected RepositoriesService createRepositoriesService() { return repositoriesService; } - protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMetadata metadata) throws IOException { + protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMetadata metadata, ShardPath shardPath) + throws IOException { Settings nodeSettings = Settings.builder().put("node.name", shardRouting.currentNodeId()).build(); ShardId shardId = shardRouting.shardId(); RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = createRemoteSegmentStoreDirectory(shardId, path); - return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory); + return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory, shardPath); } protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId shardId, Path path) throws IOException {