-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Remote Store] Upload segments to remote store post refresh (#3460)
* Add RemoteDirectory interface to copy segment files to/from remote store Signed-off-by: Sachin Kale <[email protected]> Co-authored-by: Sachin Kale <[email protected]> * Add index level setting for remote store Signed-off-by: Sachin Kale <[email protected]> Co-authored-by: Sachin Kale <[email protected]> * Add RemoteDirectoryFactory and use RemoteDirectory instance in RefreshListener Co-authored-by: Sachin Kale <[email protected]> Signed-off-by: Sachin Kale <[email protected]> * Upload segment to remote store post refresh Signed-off-by: Sachin Kale <[email protected]> Co-authored-by: Sachin Kale <[email protected]>
- Loading branch information
1 parent
fc54154
commit fb13759
Showing
24 changed files
with
1,176 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
87 changes: 87 additions & 0 deletions
87
server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.shard; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.apache.lucene.search.ReferenceManager; | ||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.IOContext; | ||
|
||
import java.io.IOException; | ||
import java.nio.file.NoSuchFileException; | ||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
|
||
/** | ||
* RefreshListener implementation to upload newly created segment files to the remote store | ||
*/ | ||
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { | ||
|
||
private final Directory storeDirectory; | ||
private final Directory remoteDirectory; | ||
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398) | ||
private final Set<String> filesUploadedToRemoteStore; | ||
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); | ||
|
||
public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException { | ||
this.storeDirectory = storeDirectory; | ||
this.remoteDirectory = remoteDirectory; | ||
// ToDo: Handle failures in reading list of files (GitHub #3397) | ||
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll())); | ||
} | ||
|
||
@Override | ||
public void beforeRefresh() throws IOException { | ||
// Do Nothing | ||
} | ||
|
||
/** | ||
* Upload new segment files created as part of the last refresh to the remote segment store. | ||
* The method also deletes segment files from remote store which are not part of local filesystem. | ||
* @param didRefresh true if the refresh opened a new reference | ||
* @throws IOException in case of I/O error in reading list of local files | ||
*/ | ||
@Override | ||
public void afterRefresh(boolean didRefresh) throws IOException { | ||
if (didRefresh) { | ||
Set<String> localFiles = Set.of(storeDirectory.listAll()); | ||
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> { | ||
try { | ||
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); | ||
filesUploadedToRemoteStore.add(file); | ||
} catch (NoSuchFileException e) { | ||
logger.info( | ||
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file), | ||
e | ||
); | ||
} catch (IOException e) { | ||
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) | ||
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); | ||
} | ||
}); | ||
|
||
Set<String> remoteFilesToBeDeleted = new HashSet<>(); | ||
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142) | ||
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> { | ||
try { | ||
remoteDirectory.deleteFile(file); | ||
remoteFilesToBeDeleted.add(file); | ||
} catch (IOException e) { | ||
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) | ||
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e); | ||
} | ||
}); | ||
|
||
remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove); | ||
} | ||
} | ||
} |
Oops, something went wrong.