-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Remote Translog] Introduce remote translog transfer support #4480
Changes from all commits
b12d793
5ced7a7
0792470
6dade64
0c1deac
985341a
4eec4f6
6b7df96
0b5c0f6
0d834be
09ab574
0bc8a30
8a5da20
cc4d85d
2cbb582
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ | |
import org.opensearch.common.io.stream.FilterStreamInput; | ||
import org.opensearch.common.io.stream.StreamInput; | ||
|
||
import java.io.EOFException; | ||
import java.io.IOException; | ||
import java.util.zip.CRC32; | ||
import java.util.zip.Checksum; | ||
|
@@ -117,7 +118,11 @@ public void reset() throws IOException { | |
|
||
@Override | ||
public int read() throws IOException { | ||
return readByte() & 0xFF; | ||
try { | ||
return readByte() & 0xFF; | ||
} catch (EOFException e) { | ||
return -1; | ||
} | ||
Comment on lines
+121
to
+125
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this change ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is due to an interface contract that OpenSearch was breaking. This is unrelated to the change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like we have a base contract test for a stream input. Can you create an issue to create a contract test for this class since you've got the full context for this specific miss? |
||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.translog.transfer; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.ActionRunnable; | ||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.common.blobstore.BlobStore; | ||
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.concurrent.ExecutorService; | ||
|
||
/** | ||
* Service that handles remote transfer of translog and checkpoint files | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class BlobStoreTransferService implements TransferService { | ||
|
||
private final BlobStore blobStore; | ||
private final ExecutorService executorService; | ||
|
||
private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class); | ||
|
||
public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorService) { | ||
this.blobStore = blobStore; | ||
this.executorService = executorService; | ||
} | ||
|
||
@Override | ||
public void uploadBlobAsync( | ||
final TransferFileSnapshot fileSnapshot, | ||
Iterable<String> remoteTransferPath, | ||
ActionListener<TransferFileSnapshot> listener | ||
) { | ||
assert remoteTransferPath instanceof BlobPath; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not make the parameter type a BlobPath to let the compiler do this check for you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the interface is generic and not tied to a store, could be extended to a streaming store as well There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since you currently only have one instance of a TransferService, I would recommend not introducing an interface and do things like this. It's easy enough to refactor out an interface if/when you need it, but until you really need it you can only make educated guesses about what the generic interface should look like. Not a huge deal, I'll defer to your preference. |
||
BlobPath blobPath = (BlobPath) remoteTransferPath; | ||
executorService.execute(ActionRunnable.wrap(listener, l -> { | ||
try (InputStream inputStream = fileSnapshot.inputStream()) { | ||
blobStore.blobContainer(blobPath) | ||
.writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); | ||
l.onResponse(fileSnapshot); | ||
} catch (Exception e) { | ||
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); | ||
l.onFailure(new FileTransferException(fileSnapshot, e)); | ||
} | ||
})); | ||
} | ||
|
||
@Override | ||
public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remoteTransferPath) throws IOException { | ||
assert remoteTransferPath instanceof BlobPath; | ||
Bukhtawar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
BlobPath blobPath = (BlobPath) remoteTransferPath; | ||
try (InputStream inputStream = fileSnapshot.inputStream()) { | ||
blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); | ||
} catch (Exception ex) { | ||
throw ex; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.translog.transfer; | ||
|
||
import org.opensearch.common.Nullable; | ||
import org.opensearch.common.io.stream.BytesStreamInput; | ||
import org.opensearch.common.io.stream.InputStreamStreamInput; | ||
import org.opensearch.core.internal.io.IOUtils; | ||
import org.opensearch.index.translog.BufferedChecksumStreamInput; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.channels.Channels; | ||
import java.nio.channels.FileChannel; | ||
import java.nio.file.Path; | ||
import java.nio.file.StandardOpenOption; | ||
import java.util.Objects; | ||
|
||
/** | ||
* Snapshot of a single file that gets transferred | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class FileSnapshot implements Closeable { | ||
|
||
private final String name; | ||
@Nullable | ||
private final FileChannel fileChannel; | ||
@Nullable | ||
private Path path; | ||
@Nullable | ||
private byte[] content; | ||
|
||
private FileSnapshot(Path path) throws IOException { | ||
Objects.requireNonNull(path); | ||
this.name = path.getFileName().toString(); | ||
this.path = path; | ||
this.fileChannel = FileChannel.open(path, StandardOpenOption.READ); | ||
} | ||
|
||
private FileSnapshot(String name, byte[] content) { | ||
Objects.requireNonNull(name); | ||
this.name = name; | ||
this.content = content; | ||
this.fileChannel = null; | ||
} | ||
|
||
public Path getPath() { | ||
return path; | ||
} | ||
|
||
public String getName() { | ||
return name; | ||
} | ||
|
||
public long getContentLength() throws IOException { | ||
return fileChannel == null ? content.length : fileChannel.size(); | ||
} | ||
|
||
public InputStream inputStream() throws IOException { | ||
return fileChannel != null | ||
? new BufferedChecksumStreamInput( | ||
new InputStreamStreamInput(Channels.newInputStream(fileChannel), fileChannel.size()), | ||
path.toString() | ||
) | ||
: new BufferedChecksumStreamInput(new BytesStreamInput(content), name); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(name, content, path); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
FileSnapshot other = (FileSnapshot) o; | ||
return Objects.equals(this.name, other.name) | ||
&& Objects.equals(this.content, other.content) | ||
&& Objects.equals(this.path, other.path); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return new StringBuilder("FileInfo [").append(" name = ") | ||
.append(name) | ||
.append(", path = ") | ||
.append(path.toUri()) | ||
.append("]") | ||
.toString(); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
IOUtils.close(fileChannel); | ||
} | ||
|
||
/** | ||
* Snapshot of a single file with primary term that gets transferred | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public static class TransferFileSnapshot extends FileSnapshot { | ||
Bukhtawar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private final long primaryTerm; | ||
|
||
public TransferFileSnapshot(Path path, long primaryTerm) throws IOException { | ||
super(path); | ||
this.primaryTerm = primaryTerm; | ||
} | ||
|
||
public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException { | ||
super(name, content); | ||
this.primaryTerm = primaryTerm; | ||
} | ||
|
||
public long getPrimaryTerm() { | ||
return primaryTerm; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(primaryTerm, super.hashCode()); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (super.equals(o)) { | ||
if (this == o) return true; | ||
if (getClass() != o.getClass()) return false; | ||
TransferFileSnapshot other = (TransferFileSnapshot) o; | ||
return Objects.equals(this.primaryTerm, other.primaryTerm); | ||
} | ||
return false; | ||
} | ||
} | ||
|
||
/** | ||
* Snapshot of a single .tlg file that gets transferred | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public static final class TranslogFileSnapshot extends TransferFileSnapshot { | ||
|
||
private final long generation; | ||
|
||
public TranslogFileSnapshot(long primaryTerm, long generation, Path path) throws IOException { | ||
super(path, primaryTerm); | ||
this.generation = generation; | ||
} | ||
|
||
public long getGeneration() { | ||
return generation; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(generation, super.hashCode()); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (super.equals(o)) { | ||
if (this == o) return true; | ||
if (getClass() != o.getClass()) return false; | ||
TranslogFileSnapshot other = (TranslogFileSnapshot) o; | ||
return Objects.equals(this.generation, other.generation); | ||
} | ||
return false; | ||
} | ||
} | ||
|
||
/** | ||
* Snapshot of a single .ckp file that gets transferred | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public static final class CheckpointFileSnapshot extends TransferFileSnapshot { | ||
|
||
private final long generation; | ||
|
||
private final long minTranslogGeneration; | ||
|
||
public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path) throws IOException { | ||
super(path, primaryTerm); | ||
this.minTranslogGeneration = minTranslogGeneration; | ||
this.generation = generation; | ||
} | ||
|
||
public long getGeneration() { | ||
return generation; | ||
} | ||
|
||
public long getMinTranslogGeneration() { | ||
return minTranslogGeneration; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(generation, minTranslogGeneration, super.hashCode()); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (super.equals(o)) { | ||
if (this == o) return true; | ||
if (getClass() != o.getClass()) return false; | ||
CheckpointFileSnapshot other = (CheckpointFileSnapshot) o; | ||
return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration) | ||
&& Objects.equals(this.generation, other.generation); | ||
} | ||
return false; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.translog.transfer; | ||
|
||
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; | ||
|
||
/** | ||
* Exception when a single file transfer encounters a failure | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class FileTransferException extends RuntimeException { | ||
|
||
private final TransferFileSnapshot fileSnapshot; | ||
|
||
public FileTransferException(TransferFileSnapshot fileSnapshot, Throwable cause) { | ||
super(cause); | ||
this.fileSnapshot = fileSnapshot; | ||
} | ||
|
||
public TransferFileSnapshot getFileSnapshot() { | ||
return fileSnapshot; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should go in the
Unreleased 2.x
section provided this will be backported.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack