From fb1375987643f7c99ae77442401caffca5dddc25 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 10 Jun 2022 11:02:53 +0530 Subject: [PATCH] [Remote Store] Upload segments to remote store post refresh (#3460) * Add RemoteDirectory interface to copy segment files to/from remote store Signed-off-by: Sachin Kale Co-authored-by: Sachin Kale * Add index level setting for remote store Signed-off-by: Sachin Kale Co-authored-by: Sachin Kale * Add RemoteDirectoryFactory and use RemoteDirectory instance in RefreshListener Co-authored-by: Sachin Kale Signed-off-by: Sachin Kale * Upload segment to remote store post refresh Signed-off-by: Sachin Kale Co-authored-by: Sachin Kale --- .../opensearch/index/shard/IndexShardIT.java | 3 +- .../cluster/metadata/IndexMetadata.java | 11 + .../common/settings/IndexScopedSettings.java | 4 +- .../opensearch/common/util/FeatureFlags.java | 6 + .../org/opensearch/index/IndexModule.java | 4 + .../org/opensearch/index/IndexService.java | 28 ++- .../org/opensearch/index/IndexSettings.java | 9 + .../opensearch/index/shard/IndexShard.java | 16 +- .../shard/RemoteStoreRefreshListener.java | 87 ++++++++ .../index/store/RemoteDirectory.java | 193 ++++++++++++++++++ .../index/store/RemoteDirectoryFactory.java | 37 ++++ .../index/store/RemoteIndexInput.java | 85 ++++++++ .../index/store/RemoteIndexOutput.java | 99 +++++++++ .../opensearch/indices/IndicesService.java | 8 +- .../opensearch/plugins/IndexStorePlugin.java | 17 ++ .../common/util/FeatureFlagTests.java | 7 + .../opensearch/index/IndexSettingsTests.java | 39 ++++ .../RemoteStoreRefreshListenerTests.java | 139 +++++++++++++ .../store/RemoteDirectoryFactoryTests.java | 65 ++++++ .../index/store/RemoteDirectoryTests.java | 158 ++++++++++++++ .../index/store/RemoteIndexInputTests.java | 99 +++++++++ .../index/store/RemoteIndexOutputTests.java | 68 ++++++ ...dicesLifecycleListenerSingleNodeTests.java | 3 +- .../index/shard/IndexShardTestCase.java | 3 +- 24 files changed, 1176 insertions(+), 12 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java create mode 100644 server/src/main/java/org/opensearch/index/store/RemoteDirectory.java create mode 100644 server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java create mode 100644 server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java create mode 100644 server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java create mode 100644 server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java create mode 100644 server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java create mode 100644 server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java create mode 100644 server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java create mode 100644 server/src/test/java/org/opensearch/index/store/RemoteIndexOutputTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 888881d43eb11..2bf73b34247b3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -675,7 +675,8 @@ public static final IndexShard newIndexShard( () -> {}, RetentionLeaseSyncer.EMPTY, cbs, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + null ); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index ec70e642ababc..442137fb70e1f 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -283,6 +283,17 @@ public Iterator> settings() { Property.Final ); + public static final String SETTING_REMOTE_STORE = "index.remote_store"; + /** + * Used to specify if the index data should be persisted in the remote store. + */ + public static final Setting INDEX_REMOTE_STORE_SETTING = Setting.boolSetting( + SETTING_REMOTE_STORE, + false, + Property.IndexScope, + Property.Final + ); + public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final Setting INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING; diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index ba2666b53d7a8..75d7081e7729a 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -217,7 +217,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings { */ public static final Map FEATURE_FLAGGED_INDEX_SETTINGS = Map.of( FeatureFlags.REPLICATION_TYPE, - IndexMetadata.INDEX_REPLICATION_TYPE_SETTING + IndexMetadata.INDEX_REPLICATION_TYPE_SETTING, + FeatureFlags.REMOTE_STORE, + IndexMetadata.INDEX_REMOTE_STORE_SETTING ); public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS); diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 0b31e3814667a..fa39dc9ac5aa0 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -23,6 +23,12 @@ public class FeatureFlags { */ public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled"; + /** + * Gates the visibility of the index setting that allows persisting data to remote store along with local disk. + * Once the feature is ready for production release, this feature flag can be removed. + */ + public static final String REMOTE_STORE = "opensearch.experimental.feature.remote_store.enabled"; + /** * Used to test feature flags whose values are expected to be booleans. * This method returns true if the value is "true" (case-insensitive), diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 49daf8293656c..2cea0e4e3e95c 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -70,6 +70,7 @@ import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; +import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -118,6 +119,8 @@ public final class IndexModule { private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory(); + private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory(); + private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new; public static final Setting INDEX_STORE_TYPE_SETTING = new Setting<>( @@ -516,6 +519,7 @@ public IndexService newIndexService( client, queryCache, directoryFactory, + REMOTE_DIRECTORY_FACTORY, eventListener, readerWrapperFactory, mapperRegistry, diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 0a6d1501f2bea..f699278919d6b 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -81,6 +81,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.IndexingOperationListener; +import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardNotFoundException; @@ -96,6 +97,9 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.threadpool.ThreadPool; @@ -136,6 +140,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final NodeEnvironment nodeEnv; private final ShardStoreDeleter shardStoreDeleter; private final IndexStorePlugin.DirectoryFactory directoryFactory; + private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory; private final CheckedFunction readerWrapper; private final IndexCache indexCache; @@ -190,6 +195,7 @@ public IndexService( Client client, QueryCache queryCache, IndexStorePlugin.DirectoryFactory directoryFactory, + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, IndexEventListener eventListener, Function> wrapperFactory, MapperRegistry mapperRegistry, @@ -260,6 +266,7 @@ public IndexService( this.eventListener = eventListener; this.nodeEnv = nodeEnv; this.directoryFactory = directoryFactory; + this.remoteDirectoryFactory = remoteDirectoryFactory; this.recoveryStateFactory = recoveryStateFactory; this.engineFactory = Objects.requireNonNull(engineFactory); this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory); @@ -430,7 +437,8 @@ public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RepositoriesService repositoriesService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -504,6 +512,21 @@ public synchronized IndexShard createShard( } }; Directory directory = directoryFactory.newDirectory(this.indexSettings, path); + Directory remoteDirectory = null; + RemoteStoreRefreshListener remoteStoreRefreshListener = null; + if (this.indexSettings.isRemoteStoreEnabled()) { + try { + Repository repository = repositoriesService.repository(clusterService.state().metadata().clusterUUID()); + remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory); + } catch (RepositoryMissingException e) { + throw new IllegalArgumentException( + "Repository should be created before creating index with remote_store enabled setting", + e + ); + } + } + store = new Store( shardId, this.indexSettings, @@ -533,7 +556,8 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null + this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null, + remoteStoreRefreshListener ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index e40acb94ee498..ed3f6002be073 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -534,6 +534,7 @@ public final class IndexSettings { private final Settings nodeSettings; private final int numberOfShards; private final ReplicationType replicationType; + private final boolean isRemoteStoreEnabled; // volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock private volatile Settings settings; private volatile IndexMetadata indexMetadata; @@ -686,6 +687,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this.indexMetadata = indexMetadata; numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null); replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE)); + isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE, false); this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); @@ -927,6 +929,13 @@ public boolean isSegRepEnabled() { return ReplicationType.SEGMENT.equals(replicationType); } + /** + * Returns if remote store is enabled for this index. + */ + public boolean isRemoteStoreEnabled() { + return isRemoteStoreEnabled; + } + /** * Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the * index settings and the node settings where node settings are overwritten by index settings. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5d11c34ca205c..bad412003df26 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -304,6 +304,8 @@ Runnable getGlobalCheckpointSyncer() { private volatile boolean useRetentionLeasesInPeerRecovery; private final ReferenceManager.RefreshListener checkpointRefreshListener; + private final RemoteStoreRefreshListener remoteStoreRefreshListener; + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -325,7 +327,8 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher + @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, + @Nullable final RemoteStoreRefreshListener remoteStoreRefreshListener ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -413,6 +416,7 @@ public boolean shouldCache(Query query) { } else { this.checkpointRefreshListener = null; } + this.remoteStoreRefreshListener = remoteStoreRefreshListener; } public ThreadPool getThreadPool() { @@ -3139,11 +3143,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { } }; - final List internalRefreshListener; + final List internalRefreshListener = new ArrayList<>(); + internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); + if (remoteStoreRefreshListener != null && shardRouting.primary()) { + internalRefreshListener.add(remoteStoreRefreshListener); + } if (this.checkpointRefreshListener != null) { - internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener); - } else { - internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); + internalRefreshListener.add(checkpointRefreshListener); } return this.engineConfigFactory.newEngineConfig( diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java new file mode 100644 index 0000000000000..4b549ec485c0e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * RefreshListener implementation to upload newly created segment files to the remote store + */ +public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { + + private final Directory storeDirectory; + private final Directory remoteDirectory; + // ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398) + private final Set filesUploadedToRemoteStore; + private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); + + public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException { + this.storeDirectory = storeDirectory; + this.remoteDirectory = remoteDirectory; + // ToDo: Handle failures in reading list of files (GitHub #3397) + this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll())); + } + + @Override + public void beforeRefresh() throws IOException { + // Do Nothing + } + + /** + * Upload new segment files created as part of the last refresh to the remote segment store. + * The method also deletes segment files from remote store which are not part of local filesystem. + * @param didRefresh true if the refresh opened a new reference + * @throws IOException in case of I/O error in reading list of local files + */ + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + if (didRefresh) { + Set localFiles = Set.of(storeDirectory.listAll()); + localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> { + try { + remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); + filesUploadedToRemoteStore.add(file); + } catch (NoSuchFileException e) { + logger.info( + () -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file), + e + ); + } catch (IOException e) { + // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) + logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); + } + }); + + Set remoteFilesToBeDeleted = new HashSet<>(); + // ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142) + filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> { + try { + remoteDirectory.deleteFile(file); + remoteFilesToBeDeleted.add(file); + } catch (IOException e) { + // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) + logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e); + } + }); + + remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java new file mode 100644 index 0000000000000..2f8f977537327 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store. + * A remoteDirectory contains only files (no sub-folder hierarchy). This class does not support all the methods in + * the Directory interface. Currently, it contains implementation of methods which are used to copy files to/from + * the remote store. Implementation of remaining methods will be added as remote store is integrated with + * replication, peer recovery etc. + * + * @opensearch.internal + */ +public final class RemoteDirectory extends Directory { + + private final BlobContainer blobContainer; + + public RemoteDirectory(BlobContainer blobContainer) { + this.blobContainer = blobContainer; + } + + /** + * Returns names of all files stored in this directory. The output must be in sorted (UTF-16, + * java's {@link String#compareTo}) order. + */ + @Override + public String[] listAll() throws IOException { + return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new); + } + + /** + * Removes an existing file in the directory. + * + *

This method will not throw an exception when the file doesn't exist and simply ignores this case. + * This is a deviation from the {@code Directory} interface where it is expected to throw either + * {@link NoSuchFileException} or {@link FileNotFoundException} if {@code name} points to a non-existing file. + * + * @param name the name of an existing file. + * @throws IOException if the file exists but could not be deleted. + */ + @Override + public void deleteFile(String name) throws IOException { + // ToDo: Add a check for file existence + blobContainer.deleteBlobsIgnoringIfNotExists(Collections.singletonList(name)); + } + + /** + * Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote + * store. + * + *

In the {@link Directory} interface, it is expected to throw {@link java.nio.file.FileAlreadyExistsException} + * if the file already exists in the remote store. As this method does not open a file, it does not throw the + * exception. + * + * @param name the name of the file to copy to remote store. + */ + @Override + public IndexOutput createOutput(String name, IOContext context) { + return new RemoteIndexOutput(name, blobContainer); + } + + /** + * Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream. + * + * @param name the name of an existing file. + * @throws IOException in case of I/O error + * @throws NoSuchFileException if the file does not exist + */ + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + return new RemoteIndexInput(name, blobContainer.readBlob(name), fileLength(name)); + } + + /** + * Closes the directory by deleting all the files in this directory + */ + @Override + public void close() throws IOException { + blobContainer.delete(); + } + + /** + * Returns the byte length of a file in the directory. + * + * @param name the name of an existing file. + * @throws IOException in case of I/O error + * @throws NoSuchFileException if the file does not exist + */ + @Override + public long fileLength(String name) throws IOException { + // ToDo: Instead of calling remote store each time, keep a cache with segment metadata + Map metadata = blobContainer.listBlobsByPrefix(name); + if (metadata.containsKey(name)) { + return metadata.get(name).length(); + } + throw new NoSuchFileException(name); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Once soft deleting is supported segment files in the remote store, this method will provide details of + * number of files marked as deleted but not actually deleted from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public Set getPendingDeletions() throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Temporary IndexOutput is not required while working with Remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Segment upload to the remote store will be permanent and does not require a separate sync API. + * This may change in the future if segment upload to remote store happens via cache and we need sync API to write + * the cache contents to the store permanently. + * + * @throws UnsupportedOperationException always + */ + @Override + public void sync(Collection names) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Once metadata to be stored with each shard is finalized, syncMetaData method will be used to sync the directory + * metadata to the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public void syncMetaData() { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * As this method is used by IndexWriter to publish commits, the implementation of this method is required when + * IndexWriter is backed by RemoteDirectory. + * + * @throws UnsupportedOperationException always + */ + @Override + public void rename(String source, String dest) throws IOException { + throw new UnsupportedOperationException(); + + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Once locking segment files in remote store is supported, implementation of this method is required with + * remote store specific LockFactory. + * + * @throws UnsupportedOperationException always + */ + @Override + public Lock obtainLock(String name) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java new file mode 100644 index 0000000000000..eb7912a1f4a2b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +import java.io.IOException; + +/** + * Factory for a remote store directory + * + * @opensearch.internal + */ +public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory { + + @Override + public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobPath blobPath = new BlobPath(); + blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId())); + BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath); + return new RemoteDirectory(blobContainer); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java b/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java new file mode 100644 index 0000000000000..24e1128dec1b5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IndexInput; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Class for input from a file in a {@link RemoteDirectory}. Used for all input operations from the remote store. + * Currently, only methods from {@link IndexInput} that are required for reading a file from remote store are + * implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication, + * peer recovery etc. + * ToDo: Extend ChecksumIndexInput + * @see RemoteDirectory + * + * @opensearch.internal + */ +public class RemoteIndexInput extends IndexInput { + + private final InputStream inputStream; + private final long size; + + public RemoteIndexInput(String name, InputStream inputStream, long size) { + super(name); + this.inputStream = inputStream; + this.size = size; + } + + @Override + public byte readByte() throws IOException { + byte[] buffer = new byte[1]; + inputStream.read(buffer); + return buffer[0]; + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + inputStream.read(b, offset, len); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + + @Override + public long length() { + return size; + } + + @Override + public void seek(long pos) throws IOException { + inputStream.skip(pos); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public long getFilePointer() { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java b/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java new file mode 100644 index 0000000000000..2af65452a6eac --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.lucene.store.InputStreamIndexInput; + +import java.io.IOException; + +/** + * Class for output to a file in a {@link RemoteDirectory}. Used for all output operations to the remote store. + * Currently, only methods from {@link IndexOutput} that are required for uploading a segment file to remote store are + * implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication, + * peer recovery etc. + * ToDo: Extend ChecksumIndexInput + * @see RemoteDirectory + * + * @opensearch.internal + */ +public class RemoteIndexOutput extends IndexOutput { + + private final BlobContainer blobContainer; + + public RemoteIndexOutput(String name, BlobContainer blobContainer) { + super(name, name); + this.blobContainer = blobContainer; + } + + @Override + public void copyBytes(DataInput input, long numBytes) throws IOException { + assert input instanceof IndexInput : "input should be instance of IndexInput"; + blobContainer.writeBlob(getName(), new InputStreamIndexInput((IndexInput) input, numBytes), numBytes, false); + } + + /** + * This is a no-op. Once segment file upload to the remote store is complete, we don't need to explicitly close + * the stream. It is taken care by internal APIs of client of the remote store. + */ + @Override + public void close() throws IOException { + // do nothing + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeByte(byte b) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeBytes(byte[] byteArray, int offset, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public long getFilePointer() { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not directly used for the file transfer to/from the remote store. + * But the checksum is important to verify integrity of the data and that means implementing this method will + * be required for the segment upload as well. + * + * @throws UnsupportedOperationException always + */ + @Override + public long getChecksum() throws IOException { + throw new UnsupportedOperationException(); + } + +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 79fd2893fb78c..b2f6f10c19638 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -859,7 +859,13 @@ public IndexShard createShard( IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); + IndexShard indexShard = indexService.createShard( + shardRouting, + globalCheckpointSyncer, + retentionLeaseSyncer, + checkpointPublisher, + repositoriesService + ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java index 2f549fec54759..52ddf6dcf2753 100644 --- a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java @@ -39,6 +39,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.repositories.Repository; import java.io.IOException; import java.util.Collections; @@ -66,6 +67,22 @@ interface DirectoryFactory { Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException; } + /** + * An interface that describes how to create a new remote directory instance per shard. + */ + @FunctionalInterface + interface RemoteDirectoryFactory { + /** + * Creates a new remote directory per shard. This method is called once per shard on shard creation. + * @param indexSettings the shards index settings + * @param shardPath the path the shard is using + * @param repository to get the BlobContainer details + * @return a new RemoteDirectory instance + * @throws IOException if an IOException occurs while opening the directory + */ + Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException; + } + /** * The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting * {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a diff --git a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java index 1084f9c658db4..a4f2b242564e2 100644 --- a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java +++ b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java @@ -21,6 +21,7 @@ public class FeatureFlagTests extends OpenSearchTestCase { @BeforeClass public static void enableFeature() { AccessController.doPrivileged((PrivilegedAction) () -> System.setProperty(FeatureFlags.REPLICATION_TYPE, "true")); + AccessController.doPrivileged((PrivilegedAction) () -> System.setProperty(FeatureFlags.REMOTE_STORE, "true")); } public void testReplicationTypeFeatureFlag() { @@ -40,4 +41,10 @@ public void testNonBooleanFeatureFlag() { assertNotNull(System.getProperty(javaVersionProperty)); assertFalse(FeatureFlags.isEnabled(javaVersionProperty)); } + + public void testRemoteStoreFeatureFlag() { + String remoteStoreFlag = FeatureFlags.REMOTE_STORE; + assertNotNull(System.getProperty(remoteStoreFlag)); + assertTrue(FeatureFlags.isEnabled(remoteStoreFlag)); + } } diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 71433673eef5a..4b3dc041b9f54 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -41,6 +41,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.translog.Translog; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; @@ -56,6 +57,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.StringContains.containsString; import static org.hamcrest.object.HasToString.hasToString; +import static org.opensearch.common.settings.IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS; public class IndexSettingsTests extends OpenSearchTestCase { @@ -753,4 +755,41 @@ public void testIgnoreTranslogRetentionSettingsIfSoftDeletesEnabled() { assertThat(indexSettings.getTranslogRetentionAge().millis(), equalTo(-1L)); assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(-1L)); } + + public void testRemoteStoreDefaultSetting() { + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertFalse(settings.isRemoteStoreEnabled()); + } + + public void testRemoteStoreExplicitSetting() { + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REMOTE_STORE, true) + .build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertTrue(settings.isRemoteStoreEnabled()); + } + + public void testUpdateRemoteStoreFails() { + Set> remoteStoreSettingSet = new HashSet<>(); + remoteStoreSettingSet.add(FEATURE_FLAGGED_INDEX_SETTINGS.get(FeatureFlags.REMOTE_STORE)); + IndexScopedSettings settings = new IndexScopedSettings(Settings.EMPTY, remoteStoreSettingSet); + IllegalArgumentException error = expectThrows( + IllegalArgumentException.class, + () -> settings.updateSettings( + Settings.builder().put("index.remote_store", randomBoolean()).build(), + Settings.builder(), + Settings.builder(), + "index" + ) + ); + assertEquals(error.getMessage(), "final index setting [index.remote_store], not updateable"); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java new file mode 100644 index 0000000000000..af92d821a9043 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.doThrow; + +public class RemoteStoreRefreshListenerTests extends OpenSearchTestCase { + private Directory storeDirectory; + private Directory remoteDirectory; + + private RemoteStoreRefreshListener remoteStoreRefreshListener; + + public void setup(String[] remoteFiles) throws IOException { + storeDirectory = mock(Directory.class); + remoteDirectory = mock(Directory.class); + when(remoteDirectory.listAll()).thenReturn(remoteFiles); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(storeDirectory, remoteDirectory); + } + + public void testAfterRefreshFalse() throws IOException { + setup(new String[0]); + remoteStoreRefreshListener.afterRefresh(false); + verify(storeDirectory, times(0)).listAll(); + } + + public void testAfterRefreshTrueNoLocalFiles() throws IOException { + setup(new String[0]); + + when(storeDirectory.listAll()).thenReturn(new String[0]); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any()); + verify(remoteDirectory, times(0)).deleteFile(any()); + } + + public void testAfterRefreshOnlyUploadFiles() throws IOException { + setup(new String[0]); + + String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" }; + when(storeDirectory.listAll()).thenReturn(localFiles); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT); + verify(remoteDirectory, times(0)).deleteFile(any()); + } + + public void testAfterRefreshOnlyUploadAndDelete() throws IOException { + setup(new String[] { "0.si", "0.cfs" }); + + String[] localFiles = new String[] { "segments_1", "1.si", "1.cfs", "1.cfe" }; + when(storeDirectory.listAll()).thenReturn(localFiles); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "1.si", "1.si", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT); + verify(remoteDirectory).deleteFile("0.si"); + verify(remoteDirectory).deleteFile("0.cfs"); + } + + public void testAfterRefreshOnlyDelete() throws IOException { + setup(new String[] { "0.si", "0.cfs" }); + + String[] localFiles = new String[] { "0.si" }; + when(storeDirectory.listAll()).thenReturn(localFiles); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any()); + verify(remoteDirectory).deleteFile("0.cfs"); + } + + public void testAfterRefreshTempLocalFile() throws IOException { + setup(new String[0]); + + String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs.tmp" }; + when(storeDirectory.listAll()).thenReturn(localFiles); + doThrow(new NoSuchFileException("0.cfs.tmp")).when(remoteDirectory) + .copyFrom(storeDirectory, "0.cfs.tmp", "0.cfs.tmp", IOContext.DEFAULT); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT); + verify(remoteDirectory, times(0)).deleteFile(any()); + } + + public void testAfterRefreshConsecutive() throws IOException { + setup(new String[0]); + + String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" }; + when(storeDirectory.listAll()).thenReturn(localFiles); + doThrow(new IOException("0.cfs")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfe", IOContext.DEFAULT); + doThrow(new IOException("0.cfe")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT); + verify(remoteDirectory, times(0)).deleteFile(any()); + + String[] localFilesSecondRefresh = new String[] { "segments_1", "0.cfs", "1.cfs", "1.cfe" }; + when(storeDirectory.listAll()).thenReturn(localFilesSecondRefresh); + + remoteStoreRefreshListener.afterRefresh(true); + + verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT); + verify(remoteDirectory).deleteFile("0.si"); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java new file mode 100644 index 0000000000000..d781fad9ab99c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +public class RemoteDirectoryFactoryTests extends OpenSearchTestCase { + + private RemoteDirectoryFactory remoteDirectoryFactory; + + @Before + public void setup() { + remoteDirectoryFactory = new RemoteDirectoryFactory(); + } + + public void testNewDirectory() throws IOException { + Settings settings = Settings.builder().build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); + Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0"); + ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0)); + BlobStoreRepository repository = mock(BlobStoreRepository.class); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + when(repository.blobStore()).thenReturn(blobStore); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); + + Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository); + assertTrue(directory instanceof RemoteDirectory); + ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); + verify(blobStore).blobContainer(blobPathCaptor.capture()); + BlobPath blobPath = blobPathCaptor.getValue(); + assertEquals("foo/0/", blobPath.buildAsString()); + + directory.listAll(); + verify(blobContainer).listBlobs(); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java new file mode 100644 index 0000000000000..c2c365d9140df --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -0,0 +1,158 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.junit.Before; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.NoSuchFileException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.mockito.Mockito.*; + +public class RemoteDirectoryTests extends OpenSearchTestCase { + private BlobContainer blobContainer; + + private RemoteDirectory remoteDirectory; + + @Before + public void setup() { + blobContainer = mock(BlobContainer.class); + remoteDirectory = new RemoteDirectory(blobContainer); + } + + public void testListAllEmpty() throws IOException { + when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); + + String[] actualFileNames = remoteDirectory.listAll(); + String[] expectedFileName = new String[] {}; + assertArrayEquals(expectedFileName, actualFileNames); + } + + public void testListAll() throws IOException { + Map fileNames = Stream.of("abc", "xyz", "pqr", "lmn", "jkl") + .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100))); + + when(blobContainer.listBlobs()).thenReturn(fileNames); + + String[] actualFileNames = remoteDirectory.listAll(); + String[] expectedFileName = new String[] { "abc", "jkl", "lmn", "pqr", "xyz" }; + assertArrayEquals(expectedFileName, actualFileNames); + } + + public void testListAllException() throws IOException { + when(blobContainer.listBlobs()).thenThrow(new IOException("Error reading blob store")); + + assertThrows(IOException.class, () -> remoteDirectory.listAll()); + } + + public void testDeleteFile() throws IOException { + remoteDirectory.deleteFile("segment_1"); + + verify(blobContainer).deleteBlobsIgnoringIfNotExists(Collections.singletonList("segment_1")); + } + + public void testDeleteFileException() throws IOException { + doThrow(new IOException("Error writing to blob store")).when(blobContainer) + .deleteBlobsIgnoringIfNotExists(Collections.singletonList("segment_1")); + + assertThrows(IOException.class, () -> remoteDirectory.deleteFile("segment_1")); + } + + public void testCreateOutput() { + IndexOutput indexOutput = remoteDirectory.createOutput("segment_1", IOContext.DEFAULT); + assertTrue(indexOutput instanceof RemoteIndexOutput); + assertEquals("segment_1", indexOutput.getName()); + } + + public void testOpenInput() throws IOException { + InputStream mockInputStream = mock(InputStream.class); + when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream); + Map fileInfo = new HashMap<>(); + fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100)); + when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo); + + IndexInput indexInput = remoteDirectory.openInput("segment_1", IOContext.DEFAULT); + assertTrue(indexInput instanceof RemoteIndexInput); + assertEquals(100, indexInput.length()); + } + + public void testOpenInputIOException() throws IOException { + when(blobContainer.readBlob("segment_1")).thenThrow(new IOException("Error while reading")); + + assertThrows(IOException.class, () -> remoteDirectory.openInput("segment_1", IOContext.DEFAULT)); + } + + public void testOpenInputNoSuchFileException() throws IOException { + InputStream mockInputStream = mock(InputStream.class); + when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream); + when(blobContainer.listBlobsByPrefix("segment_1")).thenThrow(new NoSuchFileException("segment_1")); + + assertThrows(NoSuchFileException.class, () -> remoteDirectory.openInput("segment_1", IOContext.DEFAULT)); + } + + public void testClose() throws IOException { + remoteDirectory.close(); + + verify(blobContainer).delete(); + } + + public void testCloseIOException() throws IOException { + when(blobContainer.delete()).thenThrow(new IOException("Error while writing to blob store")); + + assertThrows(IOException.class, () -> remoteDirectory.close()); + } + + public void testFileLength() throws IOException { + Map fileInfo = new HashMap<>(); + fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100)); + when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo); + + assertEquals(100, remoteDirectory.fileLength("segment_1")); + } + + public void testFileLengthIOException() throws IOException { + when(blobContainer.listBlobsByPrefix("segment_1")).thenThrow(new NoSuchFileException("segment_1")); + + assertThrows(IOException.class, () -> remoteDirectory.fileLength("segment_1")); + } + + public void testGetPendingDeletions() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.getPendingDeletions()); + } + + public void testCreateTempOutput() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.createTempOutput("segment_1", "tmp", IOContext.DEFAULT)); + } + + public void testSync() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.sync(Collections.emptyList())); + } + + public void testRename() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.rename("segment_1", "segment_2")); + } + + public void testObtainLock() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.obtainLock("segment_1")); + } + +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java new file mode 100644 index 0000000000000..c2f81c035e424 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.junit.Before; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.io.InputStream; + +import static org.mockito.Mockito.*; + +public class RemoteIndexInputTests extends OpenSearchTestCase { + + private static final String FILENAME = "segment_1"; + private static final long FILESIZE = 200; + + private InputStream inputStream; + private RemoteIndexInput remoteIndexInput; + + @Before + public void setup() { + inputStream = mock(InputStream.class); + remoteIndexInput = new RemoteIndexInput(FILENAME, inputStream, FILESIZE); + } + + public void testReadByte() throws IOException { + InputStream inputStream = spy(InputStream.class); + remoteIndexInput = new RemoteIndexInput(FILENAME, inputStream, FILESIZE); + + when(inputStream.read()).thenReturn(10); + + assertEquals(10, remoteIndexInput.readByte()); + + verify(inputStream).read(any()); + } + + public void testReadByteIOException() throws IOException { + when(inputStream.read(any())).thenThrow(new IOException("Error reading")); + + assertThrows(IOException.class, () -> remoteIndexInput.readByte()); + } + + public void testReadBytes() throws IOException { + byte[] buffer = new byte[10]; + remoteIndexInput.readBytes(buffer, 10, 20); + + verify(inputStream).read(buffer, 10, 20); + } + + public void testReadBytesIOException() throws IOException { + byte[] buffer = new byte[10]; + when(inputStream.read(buffer, 10, 20)).thenThrow(new IOException("Error reading")); + + assertThrows(IOException.class, () -> remoteIndexInput.readBytes(buffer, 10, 20)); + } + + public void testClose() throws IOException { + remoteIndexInput.close(); + + verify(inputStream).close(); + } + + public void testCloseIOException() throws IOException { + doThrow(new IOException("Error closing")).when(inputStream).close(); + + assertThrows(IOException.class, () -> remoteIndexInput.close()); + } + + public void testLength() { + assertEquals(FILESIZE, remoteIndexInput.length()); + } + + public void testSeek() throws IOException { + remoteIndexInput.seek(10); + + verify(inputStream).skip(10); + } + + public void testSeekIOException() throws IOException { + when(inputStream.skip(10)).thenThrow(new IOException("Error reading")); + + assertThrows(IOException.class, () -> remoteIndexInput.seek(10)); + } + + public void testGetFilePointer() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexInput.getFilePointer()); + } + + public void testSlice() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexInput.slice("Slice middle", 50, 100)); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteIndexOutputTests.java b/server/src/test/java/org/opensearch/index/store/RemoteIndexOutputTests.java new file mode 100644 index 0000000000000..64975f2ac4892 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteIndexOutputTests.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IndexInput; +import org.junit.Before; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +public class RemoteIndexOutputTests extends OpenSearchTestCase { + private static final String FILENAME = "segment_1"; + + private BlobContainer blobContainer; + + private RemoteIndexOutput remoteIndexOutput; + + @Before + public void setup() { + blobContainer = mock(BlobContainer.class); + remoteIndexOutput = new RemoteIndexOutput(FILENAME, blobContainer); + } + + public void testCopyBytes() throws IOException { + IndexInput indexInput = mock(IndexInput.class); + remoteIndexOutput.copyBytes(indexInput, 100); + + verify(blobContainer).writeBlob(eq(FILENAME), any(InputStreamIndexInput.class), eq(100L), eq(false)); + } + + public void testCopyBytesIOException() throws IOException { + doThrow(new IOException("Error writing")).when(blobContainer) + .writeBlob(eq(FILENAME), any(InputStreamIndexInput.class), eq(100L), eq(false)); + + IndexInput indexInput = mock(IndexInput.class); + assertThrows(IOException.class, () -> remoteIndexOutput.copyBytes(indexInput, 100)); + } + + public void testWriteByte() { + byte b = 10; + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.writeByte(b)); + } + + public void testWriteBytes() { + byte[] buffer = new byte[10]; + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.writeBytes(buffer, 50, 60)); + } + + public void testGetFilePointer() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.getFilePointer()); + } + + public void testGetChecksum() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.getChecksum()); + } +} diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0989bf869f18e..213a22539971f 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -153,7 +153,8 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + null ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 371fa6d102304..62c52ab636255 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -525,7 +525,8 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, breakerService, - checkpointPublisher + checkpointPublisher, + null ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true;