-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
…5773) * Introduce remote translog transfer support Signed-off-by: Bukhtawar Khan <[email protected]>
- Loading branch information
1 parent
05e4365
commit f9ea95e
Showing
18 changed files
with
1,209 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
71 changes: 71 additions & 0 deletions
71
server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.translog.transfer; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.ActionRunnable; | ||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.common.blobstore.BlobStore; | ||
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.concurrent.ExecutorService; | ||
|
||
/** | ||
* Service that handles remote transfer of translog and checkpoint files | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class BlobStoreTransferService implements TransferService { | ||
|
||
private final BlobStore blobStore; | ||
private final ExecutorService executorService; | ||
|
||
private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class); | ||
|
||
public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorService) { | ||
this.blobStore = blobStore; | ||
this.executorService = executorService; | ||
} | ||
|
||
@Override | ||
public void uploadBlobAsync( | ||
final TransferFileSnapshot fileSnapshot, | ||
Iterable<String> remoteTransferPath, | ||
ActionListener<TransferFileSnapshot> listener | ||
) { | ||
assert remoteTransferPath instanceof BlobPath; | ||
BlobPath blobPath = (BlobPath) remoteTransferPath; | ||
executorService.execute(ActionRunnable.wrap(listener, l -> { | ||
try (InputStream inputStream = fileSnapshot.inputStream()) { | ||
blobStore.blobContainer(blobPath) | ||
.writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); | ||
l.onResponse(fileSnapshot); | ||
} catch (Exception e) { | ||
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); | ||
l.onFailure(new FileTransferException(fileSnapshot, e)); | ||
} | ||
})); | ||
} | ||
|
||
@Override | ||
public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remoteTransferPath) throws IOException { | ||
assert remoteTransferPath instanceof BlobPath; | ||
BlobPath blobPath = (BlobPath) remoteTransferPath; | ||
try (InputStream inputStream = fileSnapshot.inputStream()) { | ||
blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); | ||
} catch (Exception ex) { | ||
throw ex; | ||
} | ||
} | ||
} |
223 changes: 223 additions & 0 deletions
223
server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.translog.transfer; | ||
|
||
import org.opensearch.common.Nullable; | ||
import org.opensearch.common.io.stream.BytesStreamInput; | ||
import org.opensearch.common.io.stream.InputStreamStreamInput; | ||
import org.opensearch.core.internal.io.IOUtils; | ||
import org.opensearch.index.translog.BufferedChecksumStreamInput; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.channels.Channels; | ||
import java.nio.channels.FileChannel; | ||
import java.nio.file.Path; | ||
import java.nio.file.StandardOpenOption; | ||
import java.util.Objects; | ||
|
||
/** | ||
* Snapshot of a single file that gets transferred | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class FileSnapshot implements Closeable { | ||
|
||
private final String name; | ||
@Nullable | ||
private final FileChannel fileChannel; | ||
@Nullable | ||
private Path path; | ||
@Nullable | ||
private byte[] content; | ||
|
||
private FileSnapshot(Path path) throws IOException { | ||
Objects.requireNonNull(path); | ||
this.name = path.getFileName().toString(); | ||
this.path = path; | ||
this.fileChannel = FileChannel.open(path, StandardOpenOption.READ); | ||
} | ||
|
||
private FileSnapshot(String name, byte[] content) { | ||
Objects.requireNonNull(name); | ||
this.name = name; | ||
this.content = content; | ||
this.fileChannel = null; | ||
} | ||
|
||
public Path getPath() { | ||
return path; | ||
} | ||
|
||
public String getName() { | ||
return name; | ||
} | ||
|
||
public long getContentLength() throws IOException { | ||
return fileChannel == null ? content.length : fileChannel.size(); | ||
} | ||
|
||
public InputStream inputStream() throws IOException { | ||
return fileChannel != null | ||
? new BufferedChecksumStreamInput( | ||
new InputStreamStreamInput(Channels.newInputStream(fileChannel), fileChannel.size()), | ||
path.toString() | ||
) | ||
: new BufferedChecksumStreamInput(new BytesStreamInput(content), name); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(name, content, path); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
FileSnapshot other = (FileSnapshot) o; | ||
return Objects.equals(this.name, other.name) | ||
&& Objects.equals(this.content, other.content) | ||
&& Objects.equals(this.path, other.path); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return new StringBuilder("FileInfo [").append(" name = ") | ||
.append(name) | ||
.append(", path = ") | ||
.append(path.toUri()) | ||
.append("]") | ||
.toString(); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
IOUtils.close(fileChannel); | ||
} | ||
|
||
/** | ||
* Snapshot of a single file with primary term that gets transferred | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public static class TransferFileSnapshot extends FileSnapshot { | ||
|
||
private final long primaryTerm; | ||
|
||
public TransferFileSnapshot(Path path, long primaryTerm) throws IOException { | ||
super(path); | ||
this.primaryTerm = primaryTerm; | ||
} | ||
|
||
public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException { | ||
super(name, content); | ||
this.primaryTerm = primaryTerm; | ||
} | ||
|
||
public long getPrimaryTerm() { | ||
return primaryTerm; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(primaryTerm, super.hashCode()); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (super.equals(o)) { | ||
if (this == o) return true; | ||
if (getClass() != o.getClass()) return false; | ||
TransferFileSnapshot other = (TransferFileSnapshot) o; | ||
return Objects.equals(this.primaryTerm, other.primaryTerm); | ||
} | ||
return false; | ||
} | ||
} | ||
|
||
/** | ||
* Snapshot of a single .tlg file that gets transferred | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public static final class TranslogFileSnapshot extends TransferFileSnapshot { | ||
|
||
private final long generation; | ||
|
||
public TranslogFileSnapshot(long primaryTerm, long generation, Path path) throws IOException { | ||
super(path, primaryTerm); | ||
this.generation = generation; | ||
} | ||
|
||
public long getGeneration() { | ||
return generation; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(generation, super.hashCode()); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (super.equals(o)) { | ||
if (this == o) return true; | ||
if (getClass() != o.getClass()) return false; | ||
TranslogFileSnapshot other = (TranslogFileSnapshot) o; | ||
return Objects.equals(this.generation, other.generation); | ||
} | ||
return false; | ||
} | ||
} | ||
|
||
/** | ||
* Snapshot of a single .ckp file that gets transferred | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public static final class CheckpointFileSnapshot extends TransferFileSnapshot { | ||
|
||
private final long generation; | ||
|
||
private final long minTranslogGeneration; | ||
|
||
public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path) throws IOException { | ||
super(path, primaryTerm); | ||
this.minTranslogGeneration = minTranslogGeneration; | ||
this.generation = generation; | ||
} | ||
|
||
public long getGeneration() { | ||
return generation; | ||
} | ||
|
||
public long getMinTranslogGeneration() { | ||
return minTranslogGeneration; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(generation, minTranslogGeneration, super.hashCode()); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (super.equals(o)) { | ||
if (this == o) return true; | ||
if (getClass() != o.getClass()) return false; | ||
CheckpointFileSnapshot other = (CheckpointFileSnapshot) o; | ||
return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration) | ||
&& Objects.equals(this.generation, other.generation); | ||
} | ||
return false; | ||
} | ||
} | ||
} |
30 changes: 30 additions & 0 deletions
30
server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.translog.transfer; | ||
|
||
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; | ||
|
||
/** | ||
* Exception when a single file transfer encounters a failure | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class FileTransferException extends RuntimeException { | ||
|
||
private final TransferFileSnapshot fileSnapshot; | ||
|
||
public FileTransferException(TransferFileSnapshot fileSnapshot, Throwable cause) { | ||
super(cause); | ||
this.fileSnapshot = fileSnapshot; | ||
} | ||
|
||
public TransferFileSnapshot getFileSnapshot() { | ||
return fileSnapshot; | ||
} | ||
} |
Oops, something went wrong.