Skip to content

Commit

Permalink
Simplify FileSnapshots and handle precommit checks
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar committed Sep 11, 2022
1 parent 4ed18dd commit a77c864
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- Introduce remote translog transfer support([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorSer
}

@Override
public void uploadFileAsync(
public void uploadBlobAsync(
final TransferFileSnapshot fileSnapshot,
Iterable<String> remoteTransferPath,
ActionListener<TransferFileSnapshot> listener
Expand All @@ -64,7 +64,7 @@ public void uploadFileAsync(
}

@Override
public void uploadFile(final TransferFileSnapshot fileSnapshot, Iterable<String> remoteTransferPath) throws IOException {
public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remoteTransferPath) throws IOException {
assert remoteTransferPath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remoteTransferPath;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@

import org.opensearch.common.Nullable;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;

/**
* Snapshot of a single file that gets transferred
Expand All @@ -27,19 +32,26 @@ public class FileSnapshot {
@Nullable
private Path path;

public FileSnapshot(String name, Path path, long checksum, byte[] content) {
this.name = name;
public FileSnapshot(Path path) throws IOException {
Objects.requireNonNull(path);
this.name = path.getFileName().toString();
this.path = path;
this.checksum = checksum;
this.content = content;
this.contentLength = content.length;
try (CheckedInputStream stream = new CheckedInputStream(Files.newInputStream(path), new CRC32())) {
this.content = stream.readAllBytes();
this.checksum = stream.getChecksum().getValue();
this.contentLength = content.length;
}
}

public FileSnapshot(String name, long checksum, byte[] content) {
public FileSnapshot(String name, byte[] content) throws IOException {
Objects.requireNonNull(content);
Objects.requireNonNull(name);
this.name = name;
this.checksum = checksum;
this.content = content;
this.contentLength = content.length;
try (CheckedInputStream stream = new CheckedInputStream(new ByteArrayInputStream(content), new CRC32())) {
this.content = stream.readAllBytes();
this.checksum = stream.getChecksum().getValue();
this.contentLength = content.length;
}
}

public Path getPath() {
Expand Down Expand Up @@ -80,20 +92,29 @@ public boolean equals(Object o) {

@Override
public String toString() {
return new StringBuilder("FileInfo [").append(name).append(path.toUri()).append(checksum).append(contentLength).toString();
return new StringBuilder("FileInfo [").append(" name = ")
.append(name)
.append(", path = ")
.append(path.toUri())
.append(", checksum = ")
.append(checksum)
.append(", contentLength = ")
.append(contentLength)
.append("]")
.toString();
}

public static class TransferFileSnapshot extends FileSnapshot {

private final long primaryTerm;

public TransferFileSnapshot(String name, Path path, long checksum, byte[] content, long primaryTerm) {
super(name, path, checksum, content);
public TransferFileSnapshot(Path path, long primaryTerm) throws IOException {
super(path);
this.primaryTerm = primaryTerm;
}

public TransferFileSnapshot(String name, long checksum, byte[] content, long primaryTerm) {
super(name, checksum, content);
public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException {
super(name, content);
this.primaryTerm = primaryTerm;
}

Expand Down Expand Up @@ -122,8 +143,8 @@ public static class TranslogFileSnapshot extends TransferFileSnapshot {

private final long generation;

public TranslogFileSnapshot(long primaryTerm, long generation, String name, Path path, long checksum, byte[] content) {
super(name, path, checksum, content, primaryTerm);
public TranslogFileSnapshot(long primaryTerm, long generation, Path path) throws IOException {
super(path, primaryTerm);
this.generation = generation;
}

Expand Down Expand Up @@ -152,8 +173,8 @@ public static class CheckpointFileSnapshot extends TransferFileSnapshot {

private final long minTranslogGeneration;

public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, String name, Path path, long checksum, byte[] content) {
super(name, path, checksum, content, primaryTerm);
public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, Path path) throws IOException {
super(path, primaryTerm);
this.minTranslogGeneration = minTranslogGeneration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
*/
public interface TransferService {

void uploadFileAsync(
void uploadBlobAsync(
final TransferFileSnapshot fileSnapshot,
Iterable<String> remotePath,
ActionListener<TransferFileSnapshot> listener
);

void uploadFile(final TransferFileSnapshot fileSnapshot, Iterable<String> remotePath) throws IOException;
void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remotePath) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.translog.TranslogReader;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashSet;
Expand All @@ -21,8 +19,6 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
Expand All @@ -42,18 +38,18 @@ public TransferSnapshotProvider(
long generation,
Path location,
List<TranslogReader> readers,
Function<Long, String> checkpointFileNameMapper
Function<Long, String> checkpointGenFileNameMapper
) throws IOException {
translogTransferSnapshot = new TranslogCheckpointTransferSnapshot(primaryTerm, generation, readers.size());
for (TranslogReader reader : readers) {
final long readerGeneration = reader.getGeneration();
final long readerPrimaryTerm = reader.getPrimaryTerm();
final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration();
Path translogPath = reader.path();
Path checkpointPath = location.resolve(checkpointFileNameMapper.apply(readerGeneration));
Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration));
translogTransferSnapshot.add(
buildTranslogFileInfo(translogPath.toFile(), readerPrimaryTerm, readerGeneration),
buildCheckpointFileInfo(checkpointPath.toFile(), readerPrimaryTerm, minTranslogGeneration)
new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath),
new CheckpointFileSnapshot(readerPrimaryTerm, minTranslogGeneration, checkpointPath)
);
}
}
Expand All @@ -63,26 +59,6 @@ public TranslogCheckpointTransferSnapshot get() {
return translogTransferSnapshot;
}

private TranslogFileSnapshot buildTranslogFileInfo(File file, long primaryTerm, long generation) throws IOException {
TranslogFileSnapshot fileSnapshot;
try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(file), new CRC32())) {
byte[] content = stream.readAllBytes();
long checksum = stream.getChecksum().getValue();
fileSnapshot = new TranslogFileSnapshot(primaryTerm, generation, file.getName(), file.toPath(), checksum, content);
}
return fileSnapshot;
}

private CheckpointFileSnapshot buildCheckpointFileInfo(File file, long primaryTerm, long minTranslogGeneration) throws IOException {
CheckpointFileSnapshot fileSnapshot;
try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(file), new CRC32())) {
byte[] content = stream.readAllBytes();
long checksum = stream.getChecksum().getValue();
fileSnapshot = new CheckpointFileSnapshot(primaryTerm, minTranslogGeneration, file.getName(), file.toPath(), checksum, content);
}
return fileSnapshot;
}

static class TranslogCheckpointTransferSnapshot implements TransferSnapshot {

private final Set<Tuple<TranslogFileSnapshot, CheckpointFileSnapshot>> translogCheckpointFileInfoTupleSet;
Expand Down Expand Up @@ -115,6 +91,7 @@ private void assertInvariants() {
assert translogCheckpointFileInfoTupleSet.size() == size : "inconsistent translog and checkpoint file count";
}

@Override
public Set<TransferFileSnapshot> getTranslogFileSnapshots() {
return translogCheckpointFileInfoTupleSet.stream().map(tuple -> tuple.v1()).collect(Collectors.toSet());
}
Expand All @@ -129,8 +106,19 @@ public TranslogTransferMetadata getTranslogTransferMetadata() {
);
}

@Override
public Set<TransferFileSnapshot> getCheckpointFileSnapshots() {
return translogCheckpointFileInfoTupleSet.stream().map(tuple -> tuple.v2()).collect(Collectors.toSet());
}

@Override
public String toString() {
return new StringBuilder("TranslogTransferSnapshot [").append(" primary term = ")
.append(primaryTerm)
.append(", generation = ")
.append(generation)
.append(" ]")
.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.index.translog.transfer.listener.FileTransferListener;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -29,8 +29,6 @@
import java.util.concurrent.TimeoutException;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
Expand All @@ -44,59 +42,58 @@ public class TranslogTransferManager {

private final TransferService transferService;
private final BlobPath remoteBaseTransferPath;
private final BlobPath remoteTransferMetadataPath;
private final FileTransferListener fileTransferListener;
private final UnaryOperator<Set<TransferFileSnapshot>> exclusionFilter;

private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000;

private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class);

public TranslogTransferManager(
TransferService transferService,
BlobPath remoteBaseTransferPath,
BlobPath remoteTransferMetadataPath,
FileTransferListener fileTransferListener,
UnaryOperator<Set<TransferFileSnapshot>> exclusionFilter
) {
this.transferService = transferService;
this.remoteBaseTransferPath = remoteBaseTransferPath;
this.remoteTransferMetadataPath = remoteTransferMetadataPath;
this.fileTransferListener = fileTransferListener;
this.exclusionFilter = exclusionFilter;
}

public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) throws IOException {
public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener)
throws IOException {
List<Exception> exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount());
try {
Set<TransferFileSnapshot> toUpload = exclusionFilter.apply(transferSnapshot.getTranslogFileSnapshots());
toUpload.addAll(exclusionFilter.apply(transferSnapshot.getCheckpointFileSnapshots()));
if (toUpload.isEmpty()) {
logger.warn("Nothing to upload for transfer size {}", transferSnapshot.getTranslogTransferMetadata().getCount());
return true;
}
final CountDownLatch latch = new CountDownLatch(toUpload.size());
LatchedActionListener<TransferFileSnapshot> latchedActionListener = new LatchedActionListener(
ActionListener.wrap(fileTransferListener::onSuccess, ex -> {
assert ex instanceof FileTransferException;
logger.error("Exception received type {}", ex.getClass(), ex);
logger.error(
() -> new ParameterizedMessage(
"Exception during transfer for file {}",
((FileTransferException) ex).getFileSnapshot().getName()
),
ex
);
FileTransferException e = (FileTransferException) ex;
fileTransferListener.onFailure(e.getFileSnapshot(), ex);
exceptionList.add(ex);
}),
latch
);
toUpload.forEach(
fileSnapshot -> transferService.uploadFileAsync(
fileSnapshot -> transferService.uploadBlobAsync(
fileSnapshot,
remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())),
latchedActionListener
)
);
try {
if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) {
Exception ex = new TimeoutException(
"Timed out waiting for transfer of generation " + transferSnapshot + " to complete"
);
Exception ex = new TimeoutException("Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete");
exceptionList.forEach(e -> ex.addSuppressed(e));
throw ex;
}
Expand All @@ -106,7 +103,11 @@ public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransfe
Thread.currentThread().interrupt();
throw ex;
}
transferService.uploadFile(prepareMetadata(transferSnapshot), remoteTransferMetadataPath);
final TransferFileSnapshot transferFileSnapshot = prepareMetadata(transferSnapshot);
transferService.uploadBlob(
prepareMetadata(transferSnapshot),
remoteBaseTransferPath.add(String.valueOf(transferFileSnapshot.getPrimaryTerm()))
);
translogTransferListener.onUploadComplete(transferSnapshot);
return true;
} catch (Exception ex) {
Expand All @@ -133,16 +134,12 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
TransferFileSnapshot fileSnapshot;
try (BytesStreamOutput output = new BytesStreamOutput()) {
translogTransferMetadata.writeTo(output);
try (
CheckedInputStream stream = new CheckedInputStream(
new ByteArrayInputStream(output.bytes().streamInput().readByteArray()),
new CRC32()
)
) {
byte[] content = stream.readAllBytes();
long checksum = stream.getChecksum().getValue();
fileSnapshot = new TransferFileSnapshot(translogTransferMetadata.getMetadataFileName(), checksum, content, -1);
}
fileSnapshot = new TransferFileSnapshot(
translogTransferMetadata.getMetadataFileName(),
BytesReference.toBytes(output.bytes()),
-1
);

}
return fileSnapshot;
}
Expand Down
Loading

0 comments on commit a77c864

Please sign in to comment.