Skip to content

Commit

Permalink
Use DirectoryFactory interface to create remote directory (#6970)
Browse files Browse the repository at this point in the history
* Use DirectoryFactory interface to create remote directory

Signed-off-by: Sachin Kale <[email protected]>

* Fix tests

Signed-off-by: Sachin Kale <[email protected]>

---------

Signed-off-by: Sachin Kale <[email protected]>
Co-authored-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale and Sachin Kale authored Apr 4, 2023
1 parent f071c9b commit 56a2bed
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 37 deletions.
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ public IndexService newIndexService(
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Expand Down
10 changes: 3 additions & 7 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
Expand Down Expand Up @@ -194,7 +194,7 @@ public IndexService(
Client client,
QueryCache queryCache,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
Expand Down Expand Up @@ -470,11 +470,7 @@ public synchronized IndexShard createShard(

Store remoteStore = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(
this.indexSettings.getRemoteStoreRepository(),
this.indexSettings,
path
);
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*
* @opensearch.internal
*/
public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory {
public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.DirectoryFactory {

private final Supplier<RepositoriesService> repositoriesService;

Expand All @@ -36,7 +36,8 @@ public RemoteSegmentStoreDirectoryFactory(Supplier<RepositoriesService> reposito
}

@Override
public Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath path) throws IOException {
public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException {
String repositoryName = indexSettings.getRemoteStoreRepository();
try (Repository repository = repositoriesService.get().repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
private final boolean nodeWriteDanglingIndicesInfo;
private final ValuesSourceRegistry valuesSourceRegistry;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

@Override
Expand Down Expand Up @@ -318,7 +318,7 @@ public IndicesService(
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.settings = settings;
Expand Down Expand Up @@ -433,7 +433,7 @@ public IndicesService(
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.settings = settings;
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ protected Node(
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);

final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
repositoriesServiceReference::get
);

Expand Down
16 changes: 0 additions & 16 deletions server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,6 @@ interface DirectoryFactory {
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}

/**
* An interface that describes how to create a new remote directory instance per shard.
*/
@FunctionalInterface
interface RemoteDirectoryFactory {
/**
* Creates a new remote directory per shard. This method is called once per shard on shard creation.
* @param repositoryName repository name
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @return a new RemoteDirectory instance
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}

/**
* The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting
* {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ public void setup() {
}

public void testNewDirectory() throws IOException {
Settings settings = Settings.builder().put(IndexMetadata.SETTING_INDEX_UUID, "uuid_1").build();
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_INDEX_UUID, "uuid_1")
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "remote_store_repository")
.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
Expand All @@ -66,7 +69,7 @@ public void testNewDirectory() throws IOException {

when(repositoriesService.repository("remote_store_repository")).thenReturn(repository);

try (Directory directory = remoteSegmentStoreDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath)) {
try (Directory directory = remoteSegmentStoreDirectoryFactory.newDirectory(indexSettings, shardPath)) {
assertTrue(directory instanceof RemoteSegmentStoreDirectory);
ArgumentCaptor<BlobPath> blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class);
verify(blobStore, times(2)).blobContainer(blobPathCaptor.capture());
Expand All @@ -80,17 +83,14 @@ public void testNewDirectory() throws IOException {
}

public void testNewDirectoryRepositoryDoesNotExist() {
Settings settings = Settings.builder().build();
Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "remote_store_repository").build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));

when(repositoriesService.repository("remote_store_repository")).thenThrow(new RepositoryMissingException("Missing"));

assertThrows(
IllegalArgumentException.class,
() -> remoteSegmentStoreDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath)
);
assertThrows(IllegalArgumentException.class, () -> remoteSegmentStoreDirectoryFactory.newDirectory(indexSettings, shardPath));
}

}

0 comments on commit 56a2bed

Please sign in to comment.