From d31c829498d1d9c19d1e3c14a16678c223a7a8f7 Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Fri, 1 Sep 2023 15:53:43 -0700 Subject: [PATCH] Add async segment file download support from remote store Signed-off-by: Kunal Kotwani --- .../multipart/RemoteStoreMultipartIT.java | 1 + .../org/opensearch/index/IndexService.java | 5 +- .../opensearch/index/shard/IndexShard.java | 60 ++++++++-- .../store/RemoteSegmentStoreDirectory.java | 37 ++++++ .../org/opensearch/index/store/Store.java | 20 +++- .../RemoteStoreReplicationSource.java | 32 ++++- .../index/shard/IndexShardTests.java | 2 +- .../RemoteSegmentStoreDirectoryTests.java | 112 ++++++++++++++++++ .../opensearch/index/store/StoreTests.java | 23 +++- .../index/shard/IndexShardTestCase.java | 13 +- 10 files changed, 274 insertions(+), 31 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java index 3ceb09e44ae36..98fab139f4902 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java @@ -104,6 +104,7 @@ public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String nam } else { return super.buildRepositoryMetadata(node, name); } + } public void testRateLimitedRemoteUploads() throws Exception { diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 38456ad3e43a5..ca0cc307e460b 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -481,7 +481,7 @@ public synchronized IndexShard createShard( Store remoteStore = null; if (this.indexSettings.isRemoteStoreEnabled()) { Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path); - remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY); + remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path); } Directory directory = directoryFactory.newDirectory(this.indexSettings, path); @@ -490,7 +490,8 @@ public synchronized IndexShard createShard( this.indexSettings, directory, lock, - new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)) + new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)), + path ); eventListener.onStoreCreated(shardId); indexShard = new IndexShard( diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 34c5ed2112482..6ab2110370f52 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -62,6 +62,8 @@ import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.PendingReplicationActions; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.metadata.DataStream; @@ -96,7 +98,6 @@ import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.io.IOUtils; -import org.opensearch.common.util.set.Sets; import org.opensearch.core.Assertions; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.unit.ByteSizeValue; @@ -199,6 +200,7 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -854,7 +856,7 @@ public void relocated( synchronized (mutex) { verifyRelocatingState(); replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under - // mutex + // mutex } } catch (final Exception e) { try { @@ -3790,7 +3792,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro replicationTracker::isPrimaryMode, translogFactorySupplier.apply(indexSettings, shardRouting), isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for - // timeseries + // timeseries ); } @@ -4853,39 +4855,71 @@ private String copySegmentFiles( Map uploadedSegments, boolean overrideLocal ) throws IOException { - List downloadedSegments = new ArrayList<>(); - List skippedSegments = new ArrayList<>(); + Set toDownloadSegments = new HashSet<>(); + Set skippedSegments = new HashSet<>(); String segmentNFile = null; + try { - Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); if (overrideLocal) { - for (String file : localSegmentFiles) { + for (String file : storeDirectory.listAll()) { storeDirectory.deleteFile(file); } } + for (String file : uploadedSegments.keySet()) { long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { - storeDirectory.copyFrom(sourceRemoteDirectory, file, file, IOContext.DEFAULT); - downloadedSegments.add(file); + toDownloadSegments.add(file); } else { skippedSegments.add(file); } - if (targetRemoteDirectory != null) { - targetRemoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); - } + if (file.startsWith(IndexFileNames.SEGMENTS)) { assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file"; segmentNFile = file; } } + + if (toDownloadSegments.isEmpty() == false) { + try { + final PlainActionFuture completionListener = PlainActionFuture.newFuture(); + downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, completionListener); + completionListener.actionGet(); + } catch (Exception e) { + throw new IOException("Error occurred when downloading segments from remote store", e); + } + } } finally { - logger.trace("Downloaded segments here: {}", downloadedSegments); + logger.trace("Downloaded segments here: {}", toDownloadSegments); logger.trace("Skipped download for segments here: {}", skippedSegments); } + return segmentNFile; } + private void downloadSegments( + Directory storeDirectory, + RemoteSegmentStoreDirectory sourceRemoteDirectory, + RemoteSegmentStoreDirectory targetRemoteDirectory, + Set toDownloadSegments, + ActionListener completionListener + ) { + final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex(); + final GroupedActionListener batchDownloadListener = new GroupedActionListener<>( + ActionListener.map(completionListener, v -> null), + toDownloadSegments.size() + ); + + final ActionListener segmentsDownloadListener = ActionListener.map(batchDownloadListener, fileName -> { + if (targetRemoteDirectory != null) { + targetRemoteDirectory.copyFrom(storeDirectory, fileName, fileName, IOContext.DEFAULT); + } + return null; + }); + + toDownloadSegments.forEach(file -> sourceRemoteDirectory.copyTo(file, storeDirectory, indexPath, segmentsDownloadListener)); + } + private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { if (checksum == CodecUtil.retrieveChecksum(indexInput)) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 266552cea93d9..a68fed5e302bd 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -24,6 +24,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Version; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.action.ActionListener; @@ -40,6 +41,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -434,6 +436,41 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } } + /** + * Copies an existing {@code source} file from this directory to a non-existent file (also + * named {@code source}) in either {@code destinationDirectory} or {@code destinationPath}. + * If the blob container backing this directory supports multipart downloads, the {@code source} + * file will be downloaded (potentially in multiple concurrent parts) directly to + * {@code destinationPath}. This method will return immediately and {@code fileCompletionListener} + * will be notified upon completion. + *

+ * If multipart downloads are not supported, then {@code source} file will be copied to a file named + * {@code source} in a single part to {@code destinationDirectory}. The download will happen on the + * calling thread and {@code fileCompletionListener} will be notified synchronously before this + * method returns. + * + * @param source The source file name + * @param destinationDirectory The destination directory (if multipart is not supported) + * @param destinationPath The destination path (if multipart is supported) + * @param fileCompletionListener The listener to notify of completion + */ + public void copyTo(String source, Directory destinationDirectory, Path destinationPath, ActionListener fileCompletionListener) { + final String blobName = getExistingRemoteFilename(source); + if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { + final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer(); + final Path destinationFilePath = destinationPath.resolve(source); + blobContainer.asyncBlobDownload(blobName, destinationFilePath, threadPool, fileCompletionListener); + } else { + // Fallback to older mechanism of downloading the file + try { + destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT); + fileCompletionListener.onResponse(source); + } catch (IOException e) { + fileCompletionListener.onFailure(e); + } + } + } + /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} * diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index b3ea2cdd02e21..285241ba89996 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -66,6 +66,7 @@ import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; import org.opensearch.common.UUIDs; +import org.opensearch.common.annotation.InternalApi; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.Lucene; @@ -92,6 +93,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.AbstractIndexShardComponent; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardPath; import org.opensearch.index.translog.Translog; import java.io.Closeable; @@ -179,6 +181,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock(); private final ShardLock shardLock; private final OnClose onClose; + private final ShardPath shardPath; // used to ref count files when a new Reader is opened for PIT/Scroll queries // prevents segment files deletion until the PIT/Scroll expires or is discarded @@ -192,10 +195,17 @@ protected void closeInternal() { }; public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) { - this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY); + this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY, null); } - public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock, OnClose onClose) { + public Store( + ShardId shardId, + IndexSettings indexSettings, + Directory directory, + ShardLock shardLock, + OnClose onClose, + ShardPath shardPath + ) { super(shardId, indexSettings); final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING); logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval); @@ -203,6 +213,7 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId)); this.shardLock = shardLock; this.onClose = onClose; + this.shardPath = shardPath; assert onClose != null; assert shardLock != null; assert shardLock.getShardId().equals(shardId); @@ -213,6 +224,11 @@ public Directory directory() { return directory; } + @InternalApi + public ShardPath shardPath() { + return shardPath; + } + /** * Returns the last committed segments info for this store * diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 7252fea044a02..64aacd68f2490 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -13,18 +13,20 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Version; +import org.opensearch.action.support.GroupedActionListener; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; +import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -106,31 +108,49 @@ public void getSegmentFiles( logger.trace("Downloading segments files from remote store {}", filesToFetch); RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile(); - List downloadedSegments = new ArrayList<>(); + List toDownloadSegments = new ArrayList<>(); Collection directoryFiles = List.of(indexShard.store().directory().listAll()); if (remoteSegmentMetadata != null) { try { indexShard.store().incRef(); indexShard.remoteStore().incRef(); final Directory storeDirectory = indexShard.store().directory(); + final ShardPath shardPath = indexShard.shardPath(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; - storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); - downloadedSegments.add(fileMetadata); + toDownloadSegments.add(fileMetadata); } - logger.trace("Downloaded segments from remote store {}", downloadedSegments); + downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, listener); + logger.trace("Downloaded segments from remote store {}", toDownloadSegments); } finally { indexShard.store().decRef(); indexShard.remoteStore().decRef(); } } - listener.onResponse(new GetSegmentFilesResponse(downloadedSegments)); } catch (Exception e) { listener.onFailure(e); } } + private void downloadSegments( + Directory storeDirectory, + RemoteSegmentStoreDirectory remoteStoreDirectory, + List toDownloadSegments, + ShardPath shardPath, + ActionListener completionListener + ) { + final Path indexPath = shardPath == null ? null : shardPath.resolveIndex(); + final GroupedActionListener batchDownloadListener = new GroupedActionListener<>( + ActionListener.map(completionListener, v -> new GetSegmentFilesResponse(toDownloadSegments)), + toDownloadSegments.size() + ); + ActionListener segmentsDownloadListener = ActionListener.map(batchDownloadListener, result -> null); + toDownloadSegments.forEach( + fileMetadata -> remoteStoreDirectory.copyTo(fileMetadata.name(), storeDirectory, indexPath, segmentsDownloadListener) + ); + } + @Override public String getDescription() { return "RemoteStoreReplicationSource"; diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index cd272d8f626d0..b2eb41828a4df 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -1780,7 +1780,7 @@ public Set getPendingDeletions() throws IOException { } }; - try (Store store = createStore(shardId, new IndexSettings(metadata, Settings.EMPTY), directory)) { + try (Store store = createStore(shardId, new IndexSettings(metadata, Settings.EMPTY), directory, shardPath)) { IndexShard shard = newShard( shardRouting, shardPath, diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 2948528ad82e1..b016e8be67799 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -28,6 +28,7 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; @@ -46,6 +47,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -61,6 +63,8 @@ import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; import static org.hamcrest.CoreMatchers.is; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -516,6 +520,114 @@ public void onFailure(Exception e) {} storeDirectory.close(); } + public void testCopyFilesToMultipart() throws Exception { + Settings settings = Settings.builder().build(); + FeatureFlags.initializeFeatureFlags(settings); + + String filename = "_0.cfe"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = mock(Directory.class); + AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + + Mockito.doAnswer(invocation -> { + ActionListener completionListener = invocation.getArgument(3); + completionListener.onResponse(invocation.getArgument(0)); + return null; + }).when(blobContainer).asyncBlobDownload(any(), any(), any(), any()); + + CountDownLatch downloadLatch = new CountDownLatch(1); + ActionListener completionListener = new ActionListener() { + @Override + public void onResponse(String unused) { + downloadLatch.countDown(); + } + + @Override + public void onFailure(Exception e) {} + }; + Path path = createTempDir(); + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener); + assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS)); + verify(blobContainer, times(1)).asyncBlobDownload(contains(filename), eq(path.resolve(filename)), any(), any()); + verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any()); + } + + public void testCopyFilesTo() throws Exception { + String filename = "_0.cfe"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = mock(Directory.class); + CountDownLatch downloadLatch = new CountDownLatch(1); + ActionListener completionListener = new ActionListener<>() { + @Override + public void onResponse(String unused) { + downloadLatch.countDown(); + } + + @Override + public void onFailure(Exception e) {} + }; + Path path = createTempDir(); + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener); + assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); + verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); + } + + public void testCopyFilesToEmptyPath() throws Exception { + String filename = "_0.cfe"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = mock(Directory.class); + AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + + CountDownLatch downloadLatch = new CountDownLatch(1); + ActionListener completionListener = new ActionListener<>() { + @Override + public void onResponse(String unused) { + downloadLatch.countDown(); + } + + @Override + public void onFailure(Exception e) {} + }; + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, completionListener); + assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); + verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); + } + + public void testCopyFilesToException() throws Exception { + String filename = "_0.cfe"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = mock(Directory.class); + Mockito.doThrow(new IOException()) + .when(storeDirectory) + .copyFrom(any(Directory.class), anyString(), anyString(), any(IOContext.class)); + CountDownLatch downloadLatch = new CountDownLatch(1); + ActionListener completionListener = new ActionListener<>() { + @Override + public void onResponse(String unused) { + + } + + @Override + public void onFailure(Exception e) { + downloadLatch.countDown(); + } + }; + Path path = createTempDir(); + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener); + assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); + verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); + } + public void testCopyFilesFromMultipartIOException() throws Exception { String filename = "_100.si"; AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 8395b3e8ac08e..d7d326b325cc6 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -84,6 +84,7 @@ import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.ShardPath; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; @@ -798,7 +799,7 @@ public void testOnCloseCallback() throws IOException { assertEquals(shardId, theLock.getShardId()); assertEquals(lock, theLock); count.incrementAndGet(); - }); + }, null); assertEquals(count.get(), 0); final int iters = randomIntBetween(1, 10); @@ -809,6 +810,26 @@ public void testOnCloseCallback() throws IOException { assertEquals(count.get(), 1); } + public void testStoreShardPath() { + final ShardId shardId = new ShardId("index", "_na_", 1); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(0)) + .build(); + final Path path = createTempDir().resolve(shardId.getIndex().getUUID()).resolve(String.valueOf(shardId.id())); + final ShardPath shardPath = new ShardPath(false, path, path, shardId); + final Store store = new Store( + shardId, + IndexSettingsModule.newIndexSettings("index", settings), + StoreTests.newDirectory(random()), + new DummyShardLock(shardId), + Store.OnClose.EMPTY, + shardPath + ); + assertEquals(shardPath, store.shardPath()); + store.close(); + } + public void testStoreStats() throws IOException { final ShardId shardId = new ShardId("index", "_na_", 1); Settings settings = Settings.builder() diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 43f2cce668e81..7a54c32248218 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -272,11 +272,11 @@ public Settings threadPoolSettings() { } protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { - return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex())); + return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex()), shardPath); } - protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException { - return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); + protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardPath shardPath) throws IOException { + return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId), Store.OnClose.EMPTY, shardPath); } protected Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { @@ -654,7 +654,7 @@ protected IndexShard newShard( remotePath = createTempDir(); } - remoteStore = createRemoteStore(remotePath, routing, indexMetadata); + remoteStore = createRemoteStore(remotePath, routing, indexMetadata, shardPath); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, indexSettings.getSettings()); BlobStoreRepository repo = createRepository(remotePath); @@ -768,11 +768,12 @@ protected RepositoriesService createRepositoriesService() { return repositoriesService; } - protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMetadata metadata) throws IOException { + protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMetadata metadata, ShardPath shardPath) + throws IOException { Settings nodeSettings = Settings.builder().put("node.name", shardRouting.currentNodeId()).build(); ShardId shardId = shardRouting.shardId(); RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = createRemoteSegmentStoreDirectory(shardId, path); - return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory); + return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory, shardPath); } protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId shardId, Path path) throws IOException {