From b12d7932ff50d17b454e9d4a99e0277f67968ca4 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sun, 11 Sep 2022 01:33:30 +0530 Subject: [PATCH 01/13] Introduce remote translog transfer support Signed-off-by: Bukhtawar Khan --- .../opensearch/index/translog/Checkpoint.java | 6 +- .../index/translog/TranslogReader.java | 2 +- .../transfer/BlobStoreTransferService.java | 83 ++++++++ .../index/translog/transfer/FileSnapshot.java | 180 ++++++++++++++++++ .../transfer/FileTransferException.java | 35 ++++ .../translog/transfer/TransferService.java | 31 +++ .../translog/transfer/TransferSnapshot.java | 27 +++ .../transfer/TransferSnapshotProvider.java | 136 +++++++++++++ .../transfer/TranslogTransferManager.java | 149 +++++++++++++++ .../transfer/TranslogTransferMetadata.java | 110 +++++++++++ .../listener/FileTransferListener.java | 23 +++ .../listener/TranslogTransferListener.java | 25 +++ 12 files changed, 805 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java diff --git a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java index ade28791b2e27..c7339ea1dac8a 100644 --- a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java @@ -59,7 +59,7 @@ * * @opensearch.internal */ -final class Checkpoint { +final public class Checkpoint { final long offset; final int numOps; @@ -262,6 +262,10 @@ public synchronized byte[] toByteArray() { return byteOutputStream.toByteArray(); } + public long getMinTranslogGeneration() { + return minTranslogGeneration; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java index 9d22fe0a498eb..205229949da77 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java @@ -138,7 +138,7 @@ public int totalOperations() { } @Override - final Checkpoint getCheckpoint() { + final public Checkpoint getCheckpoint() { return checkpoint; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java new file mode 100644 index 0000000000000..86e26d56555a1 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRunnable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +/** + * Service that handles remote transfer of translog and checkpoint files + * + * @opensearch.internal + */ +public class BlobStoreTransferService implements TransferService { + + private final BlobStore blobStore; + private final ExecutorService executorService; + + private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class); + + public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorService) { + this.blobStore = blobStore; + this.executorService = executorService; + } + + @Override + public void uploadFileAsync( + final TransferFileSnapshot fileSnapshot, + Iterable remoteTransferPath, + ActionListener listener + ) { + assert remoteTransferPath instanceof BlobPath; + BlobPath blobPath = (BlobPath) remoteTransferPath; + executorService.execute(ActionRunnable.wrap(listener, l -> { + try { + blobStore.blobContainer(blobPath) + .writeBlobAtomic( + fileSnapshot.getName(), + new ByteArrayInputStream(fileSnapshot.getContent()), + fileSnapshot.getContentLength(), + true + ); + l.onResponse(fileSnapshot); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); + l.onFailure(new FileTransferException(fileSnapshot, e)); + } + })); + } + + @Override + public void uploadFile(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { + assert remoteTransferPath instanceof BlobPath; + BlobPath blobPath = (BlobPath) remoteTransferPath; + try { + blobStore.blobContainer(blobPath) + .writeBlobAtomic( + fileSnapshot.getName(), + new ByteArrayInputStream(fileSnapshot.getContent()), + fileSnapshot.getContentLength(), + true + ); + } catch (Exception ex) { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); + throw ex; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java new file mode 100644 index 0000000000000..4658ace529cea --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.common.Nullable; + +import java.nio.file.Path; +import java.util.Objects; + +/** + * Snapshot of a single file that gets transferred + * + * @opensearch.internal + */ +public class FileSnapshot { + + private final long checksum; + private final byte[] content; + private final String name; + private final long contentLength; + @Nullable + private Path path; + + public FileSnapshot(String name, Path path, long checksum, byte[] content) { + this.name = name; + this.path = path; + this.checksum = checksum; + this.content = content; + this.contentLength = content.length; + } + + public FileSnapshot(String name, long checksum, byte[] content) { + this.name = name; + this.checksum = checksum; + this.content = content; + this.contentLength = content.length; + } + + public Path getPath() { + return path; + } + + public String getName() { + return name; + } + + public byte[] getContent() { + return content; + } + + public long getChecksum() { + return checksum; + } + + public long getContentLength() { + return contentLength; + } + + @Override + public int hashCode() { + return Objects.hash(name, path, checksum, contentLength); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FileSnapshot other = (FileSnapshot) o; + return Objects.equals(this.name, other.name) + && Objects.equals(this.path, other.path) + && Objects.equals(this.checksum, other.checksum) + && Objects.equals(this.contentLength, other.contentLength); + } + + @Override + public String toString() { + return new StringBuilder("FileInfo [").append(name).append(path.toUri()).append(checksum).append(contentLength).toString(); + } + + public static class TransferFileSnapshot extends FileSnapshot { + + private final long primaryTerm; + + public TransferFileSnapshot(String name, Path path, long checksum, byte[] content, long primaryTerm) { + super(name, path, checksum, content); + this.primaryTerm = primaryTerm; + } + + public TransferFileSnapshot(String name, long checksum, byte[] content, long primaryTerm) { + super(name, checksum, content); + this.primaryTerm = primaryTerm; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + @Override + public int hashCode() { + return Objects.hash(primaryTerm, super.hashCode()); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o)) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TransferFileSnapshot other = (TransferFileSnapshot) o; + return Objects.equals(this.primaryTerm, other.primaryTerm); + } + return false; + } + } + + public static class TranslogFileSnapshot extends TransferFileSnapshot { + + private final long generation; + + public TranslogFileSnapshot(long primaryTerm, long generation, String name, Path path, long checksum, byte[] content) { + super(name, path, checksum, content, primaryTerm); + this.generation = generation; + } + + public long getGeneration() { + return generation; + } + + @Override + public int hashCode() { + return Objects.hash(generation, super.hashCode()); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o)) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TranslogFileSnapshot other = (TranslogFileSnapshot) o; + return Objects.equals(this.generation, other.generation); + } + return false; + } + } + + public static class CheckpointFileSnapshot extends TransferFileSnapshot { + + private final long minTranslogGeneration; + + public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, String name, Path path, long checksum, byte[] content) { + super(name, path, checksum, content, primaryTerm); + this.minTranslogGeneration = minTranslogGeneration; + } + + public long getMinTranslogGeneration() { + return minTranslogGeneration; + } + + @Override + public int hashCode() { + return Objects.hash(minTranslogGeneration, super.hashCode()); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o)) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CheckpointFileSnapshot other = (CheckpointFileSnapshot) o; + return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration); + } + return false; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java new file mode 100644 index 0000000000000..7b30be7ca639a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +/** + * Exception when a single file transfer encounters a failure + * + * @opensearch.internal + */ +public class FileTransferException extends RuntimeException { + + private final TransferFileSnapshot fileSnapshot; + + public FileTransferException(TransferFileSnapshot fileSnapshot, Throwable cause) { + super(cause); + this.fileSnapshot = fileSnapshot; + } + + public FileTransferException(TransferFileSnapshot fileSnapshot, String message, Throwable cause) { + super(message, cause); + this.fileSnapshot = fileSnapshot; + } + + public TransferFileSnapshot getFileSnapshot() { + return fileSnapshot; + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java new file mode 100644 index 0000000000000..29382a0318aba --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.action.ActionListener; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +import java.io.IOException; + +/** + * Interface for the translog transfer service responsible for interacting with a remote store + * + * @opensearch.internal + */ +public interface TransferService { + + void uploadFileAsync( + final TransferFileSnapshot fileSnapshot, + Iterable remotePath, + ActionListener listener + ); + + void uploadFile(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; + +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java new file mode 100644 index 0000000000000..3da9298dfcea0 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +import java.util.Set; + +/** + * The snapshot of the files and it's metadata that is transferred to the {@link TransferService} + * + * @opensearch.internal + */ +public interface TransferSnapshot { + + Set getCheckpointFileSnapshots(); + + Set getTranslogFileSnapshots(); + + TranslogTransferMetadata getTranslogTransferMetadata(); +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java new file mode 100644 index 0000000000000..373cb732cf0d3 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.common.collect.Tuple; +import org.opensearch.index.translog.TranslogReader; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; + +import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; + +/** + * Provider for a {@link TransferSnapshot} which builds the snapshot from the translog and checkpoint files present on the local-disk + * + * @opensearch.internal + */ +public class TransferSnapshotProvider implements Supplier { + + private final TranslogCheckpointTransferSnapshot translogTransferSnapshot; + + public TransferSnapshotProvider( + long primaryTerm, + long generation, + Path location, + List readers, + Function checkpointFileNameMapper + ) throws IOException { + translogTransferSnapshot = new TranslogCheckpointTransferSnapshot(primaryTerm, generation, readers.size()); + for (TranslogReader reader : readers) { + final long readerGeneration = reader.getGeneration(); + final long readerPrimaryTerm = reader.getPrimaryTerm(); + final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration(); + Path translogPath = reader.path(); + Path checkpointPath = location.resolve(checkpointFileNameMapper.apply(readerGeneration)); + translogTransferSnapshot.add( + buildTranslogFileInfo(translogPath.toFile(), readerPrimaryTerm, readerGeneration), + buildCheckpointFileInfo(checkpointPath.toFile(), readerPrimaryTerm, minTranslogGeneration) + ); + } + } + + public TranslogCheckpointTransferSnapshot get() { + translogTransferSnapshot.assertInvariants(); + return translogTransferSnapshot; + } + + private TranslogFileSnapshot buildTranslogFileInfo(File file, long primaryTerm, long generation) throws IOException { + TranslogFileSnapshot fileSnapshot; + try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(file), new CRC32())) { + byte[] content = stream.readAllBytes(); + long checksum = stream.getChecksum().getValue(); + fileSnapshot = new TranslogFileSnapshot(primaryTerm, generation, file.getName(), file.toPath(), checksum, content); + } + return fileSnapshot; + } + + private CheckpointFileSnapshot buildCheckpointFileInfo(File file, long primaryTerm, long minTranslogGeneration) throws IOException { + CheckpointFileSnapshot fileSnapshot; + try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(file), new CRC32())) { + byte[] content = stream.readAllBytes(); + long checksum = stream.getChecksum().getValue(); + fileSnapshot = new CheckpointFileSnapshot(primaryTerm, minTranslogGeneration, file.getName(), file.toPath(), checksum, content); + } + return fileSnapshot; + } + + static class TranslogCheckpointTransferSnapshot implements TransferSnapshot { + + private final Set> translogCheckpointFileInfoTupleSet; + private final int size; + private CheckpointFileSnapshot latestCheckPointFileSnapshot; + private TranslogFileSnapshot latestTranslogFileSnapshot; + private long generation; + private long highestGeneration; + private long primaryTerm; + + TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) { + translogCheckpointFileInfoTupleSet = new HashSet<>(size); + this.size = size; + this.generation = generation; + this.primaryTerm = primaryTerm; + } + + private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) { + translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot)); + if (highestGeneration < translogFileSnapshot.getGeneration()) { + latestCheckPointFileSnapshot = checkPointFileSnapshot; + latestTranslogFileSnapshot = translogFileSnapshot; + highestGeneration = translogFileSnapshot.getGeneration(); + } + } + + private void assertInvariants() { + assert this.primaryTerm == latestTranslogFileSnapshot.getPrimaryTerm() : "inconsistent primary term"; + assert this.generation == highestGeneration : "inconsistent generation"; + assert translogCheckpointFileInfoTupleSet.size() == size : "inconsistent translog and checkpoint file count"; + } + + public Set getTranslogFileSnapshots() { + return translogCheckpointFileInfoTupleSet.stream().map(tuple -> tuple.v1()).collect(Collectors.toSet()); + } + + @Override + public TranslogTransferMetadata getTranslogTransferMetadata() { + return new TranslogTransferMetadata( + latestTranslogFileSnapshot.getPrimaryTerm(), + latestTranslogFileSnapshot.getGeneration(), + latestCheckPointFileSnapshot.getMinTranslogGeneration(), + translogCheckpointFileInfoTupleSet.size() + ); + } + + public Set getCheckpointFileSnapshots() { + return translogCheckpointFileInfoTupleSet.stream().map(tuple -> tuple.v2()).collect(Collectors.toSet()); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java new file mode 100644 index 0000000000000..9de1a3bd5e187 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.index.translog.transfer.listener.FileTransferListener; +import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; + +import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; + +/** + * The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService} + * + * @opensearch.internal + */ +public class TranslogTransferManager { + + private final TransferService transferService; + private final BlobPath remoteBaseTransferPath; + private final BlobPath remoteTransferMetadataPath; + private final FileTransferListener fileTransferListener; + private final UnaryOperator> exclusionFilter; + private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; + + private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class); + + public TranslogTransferManager( + TransferService transferService, + BlobPath remoteBaseTransferPath, + BlobPath remoteTransferMetadataPath, + FileTransferListener fileTransferListener, + UnaryOperator> exclusionFilter + ) { + this.transferService = transferService; + this.remoteBaseTransferPath = remoteBaseTransferPath; + this.remoteTransferMetadataPath = remoteTransferMetadataPath; + this.fileTransferListener = fileTransferListener; + this.exclusionFilter = exclusionFilter; + } + + public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) throws IOException { + List exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount()); + try { + Set toUpload = exclusionFilter.apply(transferSnapshot.getTranslogFileSnapshots()); + toUpload.addAll(exclusionFilter.apply(transferSnapshot.getCheckpointFileSnapshots())); + if (toUpload.isEmpty()) { + logger.warn("Nothing to upload for transfer size {}", transferSnapshot.getTranslogTransferMetadata().getCount()); + return true; + } + final CountDownLatch latch = new CountDownLatch(toUpload.size()); + LatchedActionListener latchedActionListener = new LatchedActionListener( + ActionListener.wrap(fileTransferListener::onSuccess, ex -> { + assert ex instanceof FileTransferException; + logger.error("Exception received type {}", ex.getClass(), ex); + FileTransferException e = (FileTransferException) ex; + fileTransferListener.onFailure(e.getFileSnapshot(), ex); + exceptionList.add(ex); + }), + latch + ); + toUpload.forEach( + fileSnapshot -> transferService.uploadFileAsync( + fileSnapshot, + remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), + latchedActionListener + ) + ); + try { + if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { + Exception ex = new TimeoutException( + "Timed out waiting for transfer of generation " + transferSnapshot + " to complete" + ); + exceptionList.forEach(e -> ex.addSuppressed(e)); + throw ex; + } + } catch (InterruptedException ex) { + logger.error(() -> new ParameterizedMessage("Time failed for snapshot {}", transferSnapshot), ex); + exceptionList.forEach(e -> ex.addSuppressed(e)); + Thread.currentThread().interrupt(); + throw ex; + } + transferService.uploadFile(prepareMetadata(transferSnapshot), remoteTransferMetadataPath); + translogTransferListener.onUploadComplete(transferSnapshot); + return true; + } catch (Exception ex) { + logger.error(() -> new ParameterizedMessage("Transfer failed for snapshot {}", transferSnapshot), ex); + translogTransferListener.onUploadFailed(transferSnapshot, ex); + return false; + } + } + + private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException { + assert transferSnapshot.getTranslogFileSnapshots() instanceof TranslogFileSnapshot; + Map generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> { + assert s instanceof TransferFileSnapshot; + return (TranslogFileSnapshot) s; + }) + .collect( + Collectors.toMap( + snapshot -> String.valueOf(snapshot.getGeneration()), + snapshot -> String.valueOf(snapshot.getPrimaryTerm()) + ) + ); + TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); + translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); + TransferFileSnapshot fileSnapshot; + try (BytesStreamOutput output = new BytesStreamOutput()) { + translogTransferMetadata.writeTo(output); + try ( + CheckedInputStream stream = new CheckedInputStream( + new ByteArrayInputStream(output.bytes().streamInput().readByteArray()), + new CRC32() + ) + ) { + byte[] content = stream.readAllBytes(); + long checksum = stream.getChecksum().getValue(); + fileSnapshot = new TransferFileSnapshot(translogTransferMetadata.getMetadataFileName(), checksum, content, -1); + } + } + return fileSnapshot; + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java new file mode 100644 index 0000000000000..4d0aca03b9b00 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.lucene.util.SetOnce; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; + +/** + * The metadata associated with every transfer {@link TransferSnapshot} + * + * @opensearch.internal + */ +public class TranslogTransferMetadata implements Writeable { + + private final long primaryTerm; + + private final long generation; + + private final long minTranslogGeneration; + + private final long timeStamp; + + private final int count; + + private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); + + private static final String METADATA_SEPARATOR = "__"; + + public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { + this.primaryTerm = primaryTerm; + this.generation = generation; + this.minTranslogGeneration = minTranslogGeneration; + this.timeStamp = System.currentTimeMillis(); + this.count = count; + } + + TranslogTransferMetadata(StreamInput in) throws IOException { + this.primaryTerm = in.readLong(); + this.generation = in.readLong(); + this.minTranslogGeneration = in.readLong(); + this.count = in.readInt(); + this.timeStamp = in.readLong(); + this.generationToPrimaryTermMapper.set(in.readMap()); + + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + public long getGeneration() { + return generation; + } + + public long getMinTranslogGeneration() { + return minTranslogGeneration; + } + + public int getCount() { + return count; + } + + public void setGenerationToPrimaryTermMapper(Map generationToPrimaryTermMap) { + generationToPrimaryTermMapper.set(generationToPrimaryTermMap); + } + + public String getMetadataFileName() { + return String.join( + METADATA_SEPARATOR, + Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation), String.valueOf(timeStamp)) + ); + } + + @Override + public int hashCode() { + return Objects.hash(primaryTerm, generation, timeStamp); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TranslogTransferMetadata other = (TranslogTransferMetadata) o; + return Objects.equals(this.primaryTerm, other.primaryTerm) + && Objects.equals(this.generation, other.generation) + && Objects.equals(this.timeStamp, other.timeStamp); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(primaryTerm); + out.writeLong(generation); + out.writeLong(minTranslogGeneration); + out.writeLong(timeStamp); + out.writeMap(generationToPrimaryTermMapper.get()); + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java new file mode 100644 index 0000000000000..026e7c854d9ec --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer.listener; + +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +/** + * The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot} + * + * @opensearch.internal + */ +public interface FileTransferListener { + + void onSuccess(TransferFileSnapshot fileSnapshot); + + void onFailure(TransferFileSnapshot fileSnapshot, Exception e); +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java new file mode 100644 index 0000000000000..6c9822e6cd8eb --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer.listener; + +import org.opensearch.index.translog.transfer.TransferSnapshot; + +import java.io.IOException; + +/** + * The listener to be invoked on the completion or failure of a {@link TransferSnapshot} + * + * @opensearch.internal + */ +public interface TranslogTransferListener { + + void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException; + + void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException; +} From 5ced7a71c371fdc38c534a9941a97fea5f0b4fed Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Mon, 12 Sep 2022 01:37:08 +0530 Subject: [PATCH 02/13] Simplify FileSnapshots and handle precommit checks and tests Signed-off-by: Bukhtawar Khan --- CHANGELOG.md | 2 + .../transfer/BlobStoreTransferService.java | 4 +- .../index/translog/transfer/FileSnapshot.java | 63 +++++--- .../translog/transfer/TransferService.java | 4 +- .../transfer/TransferSnapshotProvider.java | 54 +++---- .../transfer/TranslogTransferManager.java | 64 ++++---- .../BlobStoreTransferServiceTests.java | 124 +++++++++++++++ .../TranslogTransferManagerTests.java | 145 ++++++++++++++++++ 8 files changed, 369 insertions(+), 91 deletions(-) create mode 100644 server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java create mode 100644 server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index c7b736205b87b..6d5a932317033 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,6 +93,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459)) - Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761)) - Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839)) +- Introduce remote translog transfer support([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480)) + ### Deprecated ### Removed - Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568)) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 86e26d56555a1..014e94673b477 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -39,7 +39,7 @@ public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorSer } @Override - public void uploadFileAsync( + public void uploadBlobAsync( final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath, ActionListener listener @@ -64,7 +64,7 @@ public void uploadFileAsync( } @Override - public void uploadFile(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { + public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; try { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 4658ace529cea..bc12cc7fe237b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -10,8 +10,13 @@ import org.opensearch.common.Nullable; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Objects; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; /** * Snapshot of a single file that gets transferred @@ -27,19 +32,26 @@ public class FileSnapshot { @Nullable private Path path; - public FileSnapshot(String name, Path path, long checksum, byte[] content) { - this.name = name; + public FileSnapshot(Path path) throws IOException { + Objects.requireNonNull(path); + this.name = path.getFileName().toString(); this.path = path; - this.checksum = checksum; - this.content = content; - this.contentLength = content.length; + try (CheckedInputStream stream = new CheckedInputStream(Files.newInputStream(path), new CRC32())) { + this.content = stream.readAllBytes(); + this.checksum = stream.getChecksum().getValue(); + this.contentLength = content.length; + } } - public FileSnapshot(String name, long checksum, byte[] content) { + public FileSnapshot(String name, byte[] content) throws IOException { + Objects.requireNonNull(content); + Objects.requireNonNull(name); this.name = name; - this.checksum = checksum; - this.content = content; - this.contentLength = content.length; + try (CheckedInputStream stream = new CheckedInputStream(new ByteArrayInputStream(content), new CRC32())) { + this.content = stream.readAllBytes(); + this.checksum = stream.getChecksum().getValue(); + this.contentLength = content.length; + } } public Path getPath() { @@ -80,20 +92,29 @@ public boolean equals(Object o) { @Override public String toString() { - return new StringBuilder("FileInfo [").append(name).append(path.toUri()).append(checksum).append(contentLength).toString(); + return new StringBuilder("FileInfo [").append(" name = ") + .append(name) + .append(", path = ") + .append(path.toUri()) + .append(", checksum = ") + .append(checksum) + .append(", contentLength = ") + .append(contentLength) + .append("]") + .toString(); } public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; - public TransferFileSnapshot(String name, Path path, long checksum, byte[] content, long primaryTerm) { - super(name, path, checksum, content); + public TransferFileSnapshot(Path path, long primaryTerm) throws IOException { + super(path); this.primaryTerm = primaryTerm; } - public TransferFileSnapshot(String name, long checksum, byte[] content, long primaryTerm) { - super(name, checksum, content); + public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException { + super(name, content); this.primaryTerm = primaryTerm; } @@ -110,7 +131,7 @@ public int hashCode() { public boolean equals(Object o) { if (super.equals(o)) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (getClass() != o.getClass()) return false; TransferFileSnapshot other = (TransferFileSnapshot) o; return Objects.equals(this.primaryTerm, other.primaryTerm); } @@ -122,8 +143,8 @@ public static class TranslogFileSnapshot extends TransferFileSnapshot { private final long generation; - public TranslogFileSnapshot(long primaryTerm, long generation, String name, Path path, long checksum, byte[] content) { - super(name, path, checksum, content, primaryTerm); + public TranslogFileSnapshot(long primaryTerm, long generation, Path path) throws IOException { + super(path, primaryTerm); this.generation = generation; } @@ -140,7 +161,7 @@ public int hashCode() { public boolean equals(Object o) { if (super.equals(o)) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (getClass() != o.getClass()) return false; TranslogFileSnapshot other = (TranslogFileSnapshot) o; return Objects.equals(this.generation, other.generation); } @@ -152,8 +173,8 @@ public static class CheckpointFileSnapshot extends TransferFileSnapshot { private final long minTranslogGeneration; - public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, String name, Path path, long checksum, byte[] content) { - super(name, path, checksum, content, primaryTerm); + public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, Path path) throws IOException { + super(path, primaryTerm); this.minTranslogGeneration = minTranslogGeneration; } @@ -170,7 +191,7 @@ public int hashCode() { public boolean equals(Object o) { if (super.equals(o)) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (getClass() != o.getClass()) return false; CheckpointFileSnapshot other = (CheckpointFileSnapshot) o; return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 29382a0318aba..39c909fdf8f99 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -20,12 +20,12 @@ */ public interface TransferService { - void uploadFileAsync( + void uploadBlobAsync( final TransferFileSnapshot fileSnapshot, Iterable remotePath, ActionListener listener ); - void uploadFile(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; + void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java index 373cb732cf0d3..e8bfe84734ed1 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java @@ -11,8 +11,6 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.index.translog.TranslogReader; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Path; import java.util.HashSet; @@ -21,8 +19,6 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; @@ -42,7 +38,7 @@ public TransferSnapshotProvider( long generation, Path location, List readers, - Function checkpointFileNameMapper + Function checkpointGenFileNameMapper ) throws IOException { translogTransferSnapshot = new TranslogCheckpointTransferSnapshot(primaryTerm, generation, readers.size()); for (TranslogReader reader : readers) { @@ -50,10 +46,10 @@ public TransferSnapshotProvider( final long readerPrimaryTerm = reader.getPrimaryTerm(); final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration(); Path translogPath = reader.path(); - Path checkpointPath = location.resolve(checkpointFileNameMapper.apply(readerGeneration)); + Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); translogTransferSnapshot.add( - buildTranslogFileInfo(translogPath.toFile(), readerPrimaryTerm, readerGeneration), - buildCheckpointFileInfo(checkpointPath.toFile(), readerPrimaryTerm, minTranslogGeneration) + new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath), + new CheckpointFileSnapshot(readerPrimaryTerm, minTranslogGeneration, checkpointPath) ); } } @@ -63,35 +59,15 @@ public TranslogCheckpointTransferSnapshot get() { return translogTransferSnapshot; } - private TranslogFileSnapshot buildTranslogFileInfo(File file, long primaryTerm, long generation) throws IOException { - TranslogFileSnapshot fileSnapshot; - try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(file), new CRC32())) { - byte[] content = stream.readAllBytes(); - long checksum = stream.getChecksum().getValue(); - fileSnapshot = new TranslogFileSnapshot(primaryTerm, generation, file.getName(), file.toPath(), checksum, content); - } - return fileSnapshot; - } - - private CheckpointFileSnapshot buildCheckpointFileInfo(File file, long primaryTerm, long minTranslogGeneration) throws IOException { - CheckpointFileSnapshot fileSnapshot; - try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(file), new CRC32())) { - byte[] content = stream.readAllBytes(); - long checksum = stream.getChecksum().getValue(); - fileSnapshot = new CheckpointFileSnapshot(primaryTerm, minTranslogGeneration, file.getName(), file.toPath(), checksum, content); - } - return fileSnapshot; - } - static class TranslogCheckpointTransferSnapshot implements TransferSnapshot { private final Set> translogCheckpointFileInfoTupleSet; private final int size; private CheckpointFileSnapshot latestCheckPointFileSnapshot; private TranslogFileSnapshot latestTranslogFileSnapshot; - private long generation; + private final long generation; private long highestGeneration; - private long primaryTerm; + private final long primaryTerm; TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) { translogCheckpointFileInfoTupleSet = new HashSet<>(size); @@ -115,8 +91,9 @@ private void assertInvariants() { assert translogCheckpointFileInfoTupleSet.size() == size : "inconsistent translog and checkpoint file count"; } + @Override public Set getTranslogFileSnapshots() { - return translogCheckpointFileInfoTupleSet.stream().map(tuple -> tuple.v1()).collect(Collectors.toSet()); + return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet()); } @Override @@ -125,12 +102,23 @@ public TranslogTransferMetadata getTranslogTransferMetadata() { latestTranslogFileSnapshot.getPrimaryTerm(), latestTranslogFileSnapshot.getGeneration(), latestCheckPointFileSnapshot.getMinTranslogGeneration(), - translogCheckpointFileInfoTupleSet.size() + translogCheckpointFileInfoTupleSet.size() * 2 ); } + @Override public Set getCheckpointFileSnapshots() { - return translogCheckpointFileInfoTupleSet.stream().map(tuple -> tuple.v2()).collect(Collectors.toSet()); + return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v2).collect(Collectors.toSet()); + } + + @Override + public String toString() { + return new StringBuilder("TranslogTransferSnapshot [").append(" primary term = ") + .append(primaryTerm) + .append(", generation = ") + .append(generation) + .append(" ]") + .toString(); } } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 9de1a3bd5e187..edf8bbeb584a8 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -14,13 +14,14 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.index.translog.transfer.listener.FileTransferListener; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,8 +30,6 @@ import java.util.concurrent.TimeoutException; import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; @@ -44,9 +43,9 @@ public class TranslogTransferManager { private final TransferService transferService; private final BlobPath remoteBaseTransferPath; - private final BlobPath remoteTransferMetadataPath; private final FileTransferListener fileTransferListener; private final UnaryOperator> exclusionFilter; + private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class); @@ -54,31 +53,33 @@ public class TranslogTransferManager { public TranslogTransferManager( TransferService transferService, BlobPath remoteBaseTransferPath, - BlobPath remoteTransferMetadataPath, FileTransferListener fileTransferListener, UnaryOperator> exclusionFilter ) { this.transferService = transferService; this.remoteBaseTransferPath = remoteBaseTransferPath; - this.remoteTransferMetadataPath = remoteTransferMetadataPath; this.fileTransferListener = fileTransferListener; this.exclusionFilter = exclusionFilter; } - public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) throws IOException { + public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) + throws IOException { List exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount()); + Set toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount()); try { - Set toUpload = exclusionFilter.apply(transferSnapshot.getTranslogFileSnapshots()); + toUpload.addAll(exclusionFilter.apply(transferSnapshot.getTranslogFileSnapshots())); toUpload.addAll(exclusionFilter.apply(transferSnapshot.getCheckpointFileSnapshots())); - if (toUpload.isEmpty()) { - logger.warn("Nothing to upload for transfer size {}", transferSnapshot.getTranslogTransferMetadata().getCount()); - return true; - } final CountDownLatch latch = new CountDownLatch(toUpload.size()); - LatchedActionListener latchedActionListener = new LatchedActionListener( + LatchedActionListener latchedActionListener = new LatchedActionListener<>( ActionListener.wrap(fileTransferListener::onSuccess, ex -> { assert ex instanceof FileTransferException; - logger.error("Exception received type {}", ex.getClass(), ex); + logger.error( + () -> new ParameterizedMessage( + "Exception during transfer for file {}", + ((FileTransferException) ex).getFileSnapshot().getName() + ), + ex + ); FileTransferException e = (FileTransferException) ex; fileTransferListener.onFailure(e.getFileSnapshot(), ex); exceptionList.add(ex); @@ -86,7 +87,7 @@ public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransfe latch ); toUpload.forEach( - fileSnapshot -> transferService.uploadFileAsync( + fileSnapshot -> transferService.uploadBlobAsync( fileSnapshot, remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), latchedActionListener @@ -94,19 +95,21 @@ public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransfe ); try { if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { - Exception ex = new TimeoutException( - "Timed out waiting for transfer of generation " + transferSnapshot + " to complete" - ); - exceptionList.forEach(e -> ex.addSuppressed(e)); + Exception ex = new TimeoutException("Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"); + exceptionList.forEach(ex::addSuppressed); throw ex; } } catch (InterruptedException ex) { logger.error(() -> new ParameterizedMessage("Time failed for snapshot {}", transferSnapshot), ex); - exceptionList.forEach(e -> ex.addSuppressed(e)); + exceptionList.forEach(ex::addSuppressed); Thread.currentThread().interrupt(); throw ex; } - transferService.uploadFile(prepareMetadata(transferSnapshot), remoteTransferMetadataPath); + final TransferFileSnapshot transferFileSnapshot = prepareMetadata(transferSnapshot); + transferService.uploadBlob( + prepareMetadata(transferSnapshot), + remoteBaseTransferPath.add(String.valueOf(transferFileSnapshot.getPrimaryTerm())) + ); translogTransferListener.onUploadComplete(transferSnapshot); return true; } catch (Exception ex) { @@ -117,9 +120,8 @@ public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransfe } private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException { - assert transferSnapshot.getTranslogFileSnapshots() instanceof TranslogFileSnapshot; Map generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> { - assert s instanceof TransferFileSnapshot; + assert s instanceof TranslogFileSnapshot; return (TranslogFileSnapshot) s; }) .collect( @@ -133,16 +135,12 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) TransferFileSnapshot fileSnapshot; try (BytesStreamOutput output = new BytesStreamOutput()) { translogTransferMetadata.writeTo(output); - try ( - CheckedInputStream stream = new CheckedInputStream( - new ByteArrayInputStream(output.bytes().streamInput().readByteArray()), - new CRC32() - ) - ) { - byte[] content = stream.readAllBytes(); - long checksum = stream.getChecksum().getValue(); - fileSnapshot = new TransferFileSnapshot(translogTransferMetadata.getMetadataFileName(), checksum, content, -1); - } + fileSnapshot = new TransferFileSnapshot( + translogTransferMetadata.getMetadataFileName(), + BytesReference.toBytes(output.bytes()), + -1 + ); + } return fileSnapshot; } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java new file mode 100644 index 0000000000000..01942fe9e96a4 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.Environment; +import org.opensearch.env.TestEnvironment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.BlobStoreTestUtil; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class BlobStoreTransferServiceTests extends OpenSearchTestCase { + + private ExecutorService executorService; + + private BlobStoreRepository repository; + + @Override + public void setUp() throws Exception { + super.setUp(); + repository = createRepository(); + executorService = Executors.newFixedThreadPool(1); + } + + public void testUploadBlob() throws IOException { + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + createTempFile(), + randomNonNegativeLong() + ); + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); + transferService.uploadBlob(transferFileSnapshot, repository.basePath()); + } + + public void testUploadBlobAsync() throws IOException, InterruptedException { + AtomicBoolean succeeded = new AtomicBoolean(false); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + createTempFile(), + randomNonNegativeLong() + ); + CountDownLatch latch = new CountDownLatch(1); + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); + transferService.uploadBlobAsync(transferFileSnapshot, repository.basePath(), new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + assert succeeded.compareAndSet(false, true); + assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshot.getPrimaryTerm()); + assertEquals(transferFileSnapshot.getChecksum(), fileSnapshot.getChecksum()); + assertEquals(transferFileSnapshot.getName(), fileSnapshot.getName()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Failed to perform uploadBlobAsync", e); + } + }, latch)); + latch.await(100, TimeUnit.MILLISECONDS); + assertEquals(true, succeeded.get()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + repository.stop(); + executorService.shutdown(); + executorService.awaitTermination(1000, TimeUnit.MILLISECONDS); + } + + /** Create a {@link Repository} with a random name **/ + private BlobStoreRepository createRepository() { + Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final FsRepository repository = new FsRepository( + repositoryMetadata, + createEnvironment(), + xContentRegistry(), + clusterService, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo manually + } + }; + clusterService.addStateApplier(event -> repository.updateState(event.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); + repository.start(); + return repository; + } + + /** Create a {@link Environment} with random path.home and path.repo **/ + private Environment createEnvironment() { + Path home = createTempDir(); + return TestEnvironment.newEnvironment( + Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath()) + .build() + ); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java new file mode 100644 index 0000000000000..d662333fb32df --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -0,0 +1,145 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.transfer.listener.FileTransferListener; +import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +public class TranslogTransferManagerTests extends OpenSearchTestCase { + + private TransferService transferService; + private BlobPath remoteBaseTransferPath; + private long primaryTerm; + private long generation; + private long minTranslogGeneration; + + @Override + public void setUp() throws Exception { + super.setUp(); + primaryTerm = randomNonNegativeLong(); + generation = randomNonNegativeLong(); + minTranslogGeneration = randomLongBetween(0, generation); + remoteBaseTransferPath = new BlobPath().add("base_path"); + transferService = mock(TransferService.class); + } + + @SuppressWarnings("unchecked") + public void testTransferSnapshot() throws IOException { + AtomicInteger fileTransferSucceeded = new AtomicInteger(); + AtomicInteger fileTransferFailed = new AtomicInteger(); + AtomicInteger translogTransferSucceeded = new AtomicInteger(); + AtomicInteger translogTransferFailed = new AtomicInteger(); + + doNothing().when(transferService) + .uploadBlob(any(TransferFileSnapshot.class), Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm)))); + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse((TransferFileSnapshot) invocationOnMock.getArguments()[0]); + return null; + }).when(transferService).uploadBlobAsync(any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + remoteBaseTransferPath, + new FileTransferListener() { + @Override + public void onSuccess(TransferFileSnapshot fileSnapshot) { + fileTransferSucceeded.incrementAndGet(); + } + + @Override + public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { + fileTransferFailed.incrementAndGet(); + } + }, + r -> r + ); + assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { + @Override + public void onUploadComplete(TransferSnapshot transferSnapshot) { + translogTransferSucceeded.incrementAndGet(); + } + + @Override + public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { + translogTransferFailed.incrementAndGet(); + } + })); + assertEquals(4, fileTransferSucceeded.get()); + assertEquals(0, fileTransferFailed.get()); + assertEquals(1, translogTransferSucceeded.get()); + assertEquals(0, translogTransferFailed.get()); + } + + private TransferSnapshot createTransferSnapshot() { + return new TransferSnapshot() { + @Override + public Set getCheckpointFileSnapshots() { + try { + return Set.of( + new CheckpointFileSnapshot( + primaryTerm, + minTranslogGeneration, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX) + ), + new CheckpointFileSnapshot( + primaryTerm, + minTranslogGeneration, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX) + ) + ); + } catch (IOException e) { + throw new AssertionError("Failed to create temp file", e); + } + } + + @Override + public Set getTranslogFileSnapshots() { + try { + return Set.of( + new TranslogFileSnapshot( + primaryTerm, + generation, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX) + ), + new TranslogFileSnapshot( + primaryTerm, + generation - 1, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX) + ) + ); + } catch (IOException e) { + throw new AssertionError("Failed to create temp file", e); + } + } + + @Override + public TranslogTransferMetadata getTranslogTransferMetadata() { + return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5)); + } + }; + } +} From 0792470e9ede4ac1bccadb7193b93530a1aeb01f Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Mon, 12 Sep 2022 12:45:08 +0530 Subject: [PATCH 03/13] Java docs Signed-off-by: Bukhtawar Khan --- .../translog/transfer/TransferService.java | 12 ++++++++++++ .../translog/transfer/TransferSnapshot.java | 17 ++++++++++++++++- .../transfer/TranslogTransferMetadata.java | 4 +++- .../transfer/listener/FileTransferListener.java | 9 +++++++++ .../listener/TranslogTransferListener.java | 11 +++++++++++ 5 files changed, 51 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 39c909fdf8f99..60db56dee6eb6 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -20,12 +20,24 @@ */ public interface TransferService { + /** + * Uploads the {@link TransferFileSnapshot} async, once the upload is complete the callback is invoked + * @param fileSnapshot the file snapshot to upload + * @param remotePath the remote path where upload should be made + * @param listener the callback to be invoked once upload completes successfully/fails + */ void uploadBlobAsync( final TransferFileSnapshot fileSnapshot, Iterable remotePath, ActionListener listener ); + /** + * Uploads the {@link TransferFileSnapshot} blob + * @param fileSnapshot the file snapshot to upload + * @param remotePath the remote path where upload should be made + * @throws IOException + */ void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java index 3da9298dfcea0..b4c1c97f04a7d 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java @@ -9,19 +9,34 @@ package org.opensearch.index.translog.transfer; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import java.util.Set; /** - * The snapshot of the files and it's metadata that is transferred to the {@link TransferService} + * The snapshot of the generational translog and checkpoint files and it's corresponding metadata that is transferred + * to the {@link TransferService} * * @opensearch.internal */ public interface TransferSnapshot { + /** + * The snapshot of the checkpoint generational files + * @return the set of {@link CheckpointFileSnapshot} + */ Set getCheckpointFileSnapshots(); + /** + * The snapshot of the translog generational files + * @return the set of {@link TranslogFileSnapshot} + */ Set getTranslogFileSnapshots(); + /** + * The translog transfer metadata of this {@link TransferSnapshot} + * @return the translog transfer metadata + */ TranslogTransferMetadata getTranslogTransferMetadata(); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 4d0aca03b9b00..77aaee1d0ad20 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -19,7 +19,9 @@ import java.util.Objects; /** - * The metadata associated with every transfer {@link TransferSnapshot} + * The metadata associated with every transfer {@link TransferSnapshot}. The metadata is uploaded at the end of the + * tranlog and generational checkpoint uploads to mark the latest generation and the translog/checkpoint files that are + * still referenced by the last checkpoint. * * @opensearch.internal */ diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java index 026e7c854d9ec..939b56f109a36 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java @@ -17,7 +17,16 @@ */ public interface FileTransferListener { + /** + * Invoked when the transfer of a single {@link TransferFileSnapshot} succeeds + * @param fileSnapshot the corresponding file snapshot + */ void onSuccess(TransferFileSnapshot fileSnapshot); + /** + * Invoked when the transfer of a single {@link TransferFileSnapshot} fails + * @param fileSnapshot the corresponding file snapshot + * @param e the exception while processing the {@link TransferFileSnapshot} + */ void onFailure(TransferFileSnapshot fileSnapshot, Exception e); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java index 6c9822e6cd8eb..9a9dbddf467c0 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -19,7 +19,18 @@ */ public interface TranslogTransferListener { + /** + * Invoked when the transfer of {@link TransferSnapshot} succeeds + * @param transferSnapshot the transfer snapshot + * @throws IOException + */ void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException; + /** + * Invoked when the transfer of {@link TransferSnapshot} fails + * @param transferSnapshot the transfer snapshot + * @param ex the exception while processing the {@link TransferSnapshot} + * @throws IOException + */ void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException; } From 6dade64ffe46879918294fc59e56c16490b0f53f Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 20 Sep 2022 20:10:23 +0530 Subject: [PATCH 04/13] Precommit tests Signed-off-by: Bukhtawar Khan --- .../opensearch/index/translog/Checkpoint.java | 4 + .../transfer/BlobStoreTransferService.java | 21 ++--- .../index/translog/transfer/FileSnapshot.java | 85 ++++++++++--------- .../transfer/TransferSnapshotProvider.java | 19 ++++- .../transfer/TranslogTransferManager.java | 17 ++-- .../transfer/TranslogTransferMetadata.java | 55 +++++++----- .../BlobStoreTransferServiceTests.java | 1 - .../TranslogTransferManagerTests.java | 2 + 8 files changed, 116 insertions(+), 88 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java index c7339ea1dac8a..8df574ed8374f 100644 --- a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java @@ -266,6 +266,10 @@ public long getMinTranslogGeneration() { return minTranslogGeneration; } + public long getGeneration() { + return generation; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 014e94673b477..0aede52939a1a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -17,8 +17,8 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.concurrent.ExecutorService; /** @@ -47,14 +47,9 @@ public void uploadBlobAsync( assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; executorService.execute(ActionRunnable.wrap(listener, l -> { - try { + try (InputStream inputStream = fileSnapshot.inputStream()) { blobStore.blobContainer(blobPath) - .writeBlobAtomic( - fileSnapshot.getName(), - new ByteArrayInputStream(fileSnapshot.getContent()), - fileSnapshot.getContentLength(), - true - ); + .writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); l.onResponse(fileSnapshot); } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); @@ -67,14 +62,8 @@ public void uploadBlobAsync( public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; - try { - blobStore.blobContainer(blobPath) - .writeBlobAtomic( - fileSnapshot.getName(), - new ByteArrayInputStream(fileSnapshot.getContent()), - fileSnapshot.getContentLength(), - true - ); + try (InputStream inputStream = fileSnapshot.inputStream()) { + blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); } catch (Exception ex) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); throw ex; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index bc12cc7fe237b..c7f5dab2e3bf7 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -9,49 +9,47 @@ package org.opensearch.index.translog.transfer; import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.translog.BufferedChecksumStreamInput; -import java.io.ByteArrayInputStream; +import java.io.Closeable; import java.io.IOException; -import java.nio.file.Files; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.Objects; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; /** * Snapshot of a single file that gets transferred * * @opensearch.internal */ -public class FileSnapshot { +public class FileSnapshot implements Closeable { - private final long checksum; - private final byte[] content; private final String name; - private final long contentLength; + @Nullable + private final FileChannel fileChannel; @Nullable private Path path; + @Nullable + private byte[] content; public FileSnapshot(Path path) throws IOException { Objects.requireNonNull(path); - this.name = path.getFileName().toString(); + this.name = path.toString(); this.path = path; - try (CheckedInputStream stream = new CheckedInputStream(Files.newInputStream(path), new CRC32())) { - this.content = stream.readAllBytes(); - this.checksum = stream.getChecksum().getValue(); - this.contentLength = content.length; - } + this.fileChannel = FileChannel.open(path, StandardOpenOption.READ); } public FileSnapshot(String name, byte[] content) throws IOException { - Objects.requireNonNull(content); Objects.requireNonNull(name); this.name = name; - try (CheckedInputStream stream = new CheckedInputStream(new ByteArrayInputStream(content), new CRC32())) { - this.content = stream.readAllBytes(); - this.checksum = stream.getChecksum().getValue(); - this.contentLength = content.length; - } + this.content = content; + this.fileChannel = null; } public Path getPath() { @@ -62,21 +60,22 @@ public String getName() { return name; } - public byte[] getContent() { - return content; + public long getContentLength() throws IOException { + return fileChannel == null ? fileChannel.size() : content.length; } - public long getChecksum() { - return checksum; - } - - public long getContentLength() { - return contentLength; + public InputStream inputStream() throws IOException { + return fileChannel != null + ? new BufferedChecksumStreamInput( + new InputStreamStreamInput(Channels.newInputStream(fileChannel), fileChannel.size()), + path.toString() + ) + : new BufferedChecksumStreamInput(new BytesStreamInput(content), name); } @Override public int hashCode() { - return Objects.hash(name, path, checksum, contentLength); + return Objects.hash(name, content, path); } @Override @@ -85,9 +84,8 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; FileSnapshot other = (FileSnapshot) o; return Objects.equals(this.name, other.name) - && Objects.equals(this.path, other.path) - && Objects.equals(this.checksum, other.checksum) - && Objects.equals(this.contentLength, other.contentLength); + && Objects.equals(this.content, other.content) + && Objects.equals(this.path, other.path); } @Override @@ -96,14 +94,15 @@ public String toString() { .append(name) .append(", path = ") .append(path.toUri()) - .append(", checksum = ") - .append(checksum) - .append(", contentLength = ") - .append(contentLength) .append("]") .toString(); } + @Override + public void close() throws IOException { + IOUtils.close(fileChannel); + } + public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; @@ -171,11 +170,18 @@ public boolean equals(Object o) { public static class CheckpointFileSnapshot extends TransferFileSnapshot { + private final long generation; + private final long minTranslogGeneration; - public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, Path path) throws IOException { + public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path) throws IOException { super(path, primaryTerm); this.minTranslogGeneration = minTranslogGeneration; + this.generation = generation; + } + + public long getGeneration() { + return generation; } public long getMinTranslogGeneration() { @@ -184,7 +190,7 @@ public long getMinTranslogGeneration() { @Override public int hashCode() { - return Objects.hash(minTranslogGeneration, super.hashCode()); + return Objects.hash(generation, minTranslogGeneration, super.hashCode()); } @Override @@ -193,7 +199,8 @@ public boolean equals(Object o) { if (this == o) return true; if (getClass() != o.getClass()) return false; CheckpointFileSnapshot other = (CheckpointFileSnapshot) o; - return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration); + return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration) + && Objects.equals(this.generation, other.generation); } return false; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java index e8bfe84734ed1..b9c0f18dce1c4 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java @@ -14,11 +14,13 @@ import java.io.IOException; import java.nio.file.Path; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.LongStream; import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; @@ -45,11 +47,12 @@ public TransferSnapshotProvider( final long readerGeneration = reader.getGeneration(); final long readerPrimaryTerm = reader.getPrimaryTerm(); final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration(); + final long checkpointGeneration = reader.getCheckpoint().getGeneration(); Path translogPath = reader.path(); Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); translogTransferSnapshot.add( new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath), - new CheckpointFileSnapshot(readerPrimaryTerm, minTranslogGeneration, checkpointPath) + new CheckpointFileSnapshot(readerPrimaryTerm, checkpointGeneration, minTranslogGeneration, checkpointPath) ); } } @@ -63,32 +66,46 @@ static class TranslogCheckpointTransferSnapshot implements TransferSnapshot { private final Set> translogCheckpointFileInfoTupleSet; private final int size; + private final List generations; private CheckpointFileSnapshot latestCheckPointFileSnapshot; private TranslogFileSnapshot latestTranslogFileSnapshot; private final long generation; private long highestGeneration; + private long lowestGeneration; private final long primaryTerm; TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) { translogCheckpointFileInfoTupleSet = new HashSet<>(size); this.size = size; + this.generations = new LinkedList<>(); this.generation = generation; this.primaryTerm = primaryTerm; + this.highestGeneration = Long.MIN_VALUE; + this.lowestGeneration = Long.MAX_VALUE; } private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) { translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot)); + assert translogFileSnapshot.getGeneration() == checkPointFileSnapshot.getGeneration(); + generations.add(translogFileSnapshot.getGeneration()); if (highestGeneration < translogFileSnapshot.getGeneration()) { latestCheckPointFileSnapshot = checkPointFileSnapshot; latestTranslogFileSnapshot = translogFileSnapshot; highestGeneration = translogFileSnapshot.getGeneration(); } + this.lowestGeneration = Math.min(lowestGeneration, translogFileSnapshot.getGeneration()); } private void assertInvariants() { assert this.primaryTerm == latestTranslogFileSnapshot.getPrimaryTerm() : "inconsistent primary term"; assert this.generation == highestGeneration : "inconsistent generation"; assert translogCheckpointFileInfoTupleSet.size() == size : "inconsistent translog and checkpoint file count"; + assert highestGeneration <= lowestGeneration : "lowest generation is greater than highest generation"; + assert LongStream.iterate(lowestGeneration, i -> i + 1) + .limit(highestGeneration) + .boxed() + .collect(Collectors.toList()) + .equals(generations.stream().sorted().collect(Collectors.toList())) == true : "generation gaps found"; } @Override diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index edf8bbeb584a8..057956b0dbc57 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -14,10 +14,9 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.index.translog.transfer.listener.FileTransferListener; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -132,16 +131,12 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) ); TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); - TransferFileSnapshot fileSnapshot; - try (BytesStreamOutput output = new BytesStreamOutput()) { - translogTransferMetadata.writeTo(output); - fileSnapshot = new TransferFileSnapshot( - translogTransferMetadata.getMetadataFileName(), - BytesReference.toBytes(output.bytes()), - -1 - ); + TransferFileSnapshot fileSnapshot = new TransferFileSnapshot( + translogTransferMetadata.getFileName(), + translogTransferMetadata.createMetadataBytes(), + translogTransferMetadata.getPrimaryTerm() + ); - } return fileSnapshot; } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 77aaee1d0ad20..0aae773f593fd 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -8,10 +8,12 @@ package org.opensearch.index.translog.transfer; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.util.SetOnce; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; import java.io.IOException; import java.util.Arrays; @@ -25,7 +27,7 @@ * * @opensearch.internal */ -public class TranslogTransferMetadata implements Writeable { +public class TranslogTransferMetadata { private final long primaryTerm; @@ -37,10 +39,16 @@ public class TranslogTransferMetadata implements Writeable { private final int count; - private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); + private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); private static final String METADATA_SEPARATOR = "__"; + private static final int BUFFER_SIZE = 4096; + + private static final int CURRENT_VERSION = 1; + + private static final String METADATA_CODEC = "md"; + public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { this.primaryTerm = primaryTerm; this.generation = generation; @@ -49,16 +57,6 @@ public TranslogTransferMetadata(long primaryTerm, long generation, long minTrans this.count = count; } - TranslogTransferMetadata(StreamInput in) throws IOException { - this.primaryTerm = in.readLong(); - this.generation = in.readLong(); - this.minTranslogGeneration = in.readLong(); - this.count = in.readInt(); - this.timeStamp = in.readLong(); - this.generationToPrimaryTermMapper.set(in.readMap()); - - } - public long getPrimaryTerm() { return primaryTerm; } @@ -75,17 +73,35 @@ public int getCount() { return count; } - public void setGenerationToPrimaryTermMapper(Map generationToPrimaryTermMap) { + public void setGenerationToPrimaryTermMapper(Map generationToPrimaryTermMap) { generationToPrimaryTermMapper.set(generationToPrimaryTermMap); } - public String getMetadataFileName() { + public String getFileName() { return String.join( METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation), String.valueOf(timeStamp)) ); } + public byte[] createMetadataBytes() throws IOException { + try (BytesStreamOutput output = new BytesStreamOutput()) { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "translog transfer metadata " + primaryTerm, + getFileName(), + output, + BUFFER_SIZE + ) + ) { + CodecUtil.writeHeader(indexOutput, METADATA_CODEC, CURRENT_VERSION); + write(indexOutput); + CodecUtil.writeFooter(indexOutput); + } + return BytesReference.toBytes(output.bytes()); + } + } + @Override public int hashCode() { return Objects.hash(primaryTerm, generation, timeStamp); @@ -101,12 +117,11 @@ public boolean equals(Object o) { && Objects.equals(this.timeStamp, other.timeStamp); } - @Override - public void writeTo(StreamOutput out) throws IOException { + private void write(DataOutput out) throws IOException { out.writeLong(primaryTerm); out.writeLong(generation); out.writeLong(minTranslogGeneration); out.writeLong(timeStamp); - out.writeMap(generationToPrimaryTermMapper.get()); + out.writeMapOfStrings(generationToPrimaryTermMapper.get()); } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index 01942fe9e96a4..a52f882790e81 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -66,7 +66,6 @@ public void testUploadBlobAsync() throws IOException, InterruptedException { public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { assert succeeded.compareAndSet(false, true); assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshot.getPrimaryTerm()); - assertEquals(transferFileSnapshot.getChecksum(), fileSnapshot.getChecksum()); assertEquals(transferFileSnapshot.getName(), fileSnapshot.getName()); } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index d662333fb32df..d0f6bd86c032d 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -102,11 +102,13 @@ public Set getCheckpointFileSnapshots() { return Set.of( new CheckpointFileSnapshot( primaryTerm, + generation, minTranslogGeneration, createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX) ), new CheckpointFileSnapshot( primaryTerm, + generation, minTranslogGeneration, createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX) ) From 0c1deac5834735168a8179df96bc37692d70fed6 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 18 Oct 2022 12:19:44 +0530 Subject: [PATCH 05/13] Fix unit tests Signed-off-by: Sachin Kale --- .../translog/BufferedChecksumStreamInput.java | 7 ++++++- .../index/translog/transfer/FileSnapshot.java | 4 ++-- .../BlobStoreTransferServiceTests.java | 18 +++++++++--------- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java b/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java index 5feb994171b65..f299da0c1ac1e 100644 --- a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java +++ b/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java @@ -36,6 +36,7 @@ import org.opensearch.common.io.stream.FilterStreamInput; import org.opensearch.common.io.stream.StreamInput; +import java.io.EOFException; import java.io.IOException; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -117,7 +118,11 @@ public void reset() throws IOException { @Override public int read() throws IOException { - return readByte() & 0xFF; + try { + return readByte() & 0xFF; + } catch (EOFException e) { + return -1; + } } @Override diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index c7f5dab2e3bf7..87076d7acf601 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -40,7 +40,7 @@ public class FileSnapshot implements Closeable { public FileSnapshot(Path path) throws IOException { Objects.requireNonNull(path); - this.name = path.toString(); + this.name = path.getFileName().toString(); this.path = path; this.fileChannel = FileChannel.open(path, StandardOpenOption.READ); } @@ -61,7 +61,7 @@ public String getName() { } public long getContentLength() throws IOException { - return fileChannel == null ? fileChannel.size() : content.length; + return fileChannel == null ? content.length : fileChannel.size(); } public InputStream inputStream() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index a52f882790e81..cf69f13e2a89e 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -24,7 +24,9 @@ import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -45,20 +47,18 @@ public void setUp() throws Exception { } public void testUploadBlob() throws IOException { - FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( - createTempFile(), - randomNonNegativeLong() - ); + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); transferService.uploadBlob(transferFileSnapshot, repository.basePath()); } public void testUploadBlobAsync() throws IOException, InterruptedException { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); AtomicBoolean succeeded = new AtomicBoolean(false); - FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( - createTempFile(), - randomNonNegativeLong() - ); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); CountDownLatch latch = new CountDownLatch(1); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); transferService.uploadBlobAsync(transferFileSnapshot, repository.basePath(), new LatchedActionListener<>(new ActionListener<>() { @@ -74,7 +74,7 @@ public void onFailure(Exception e) { throw new AssertionError("Failed to perform uploadBlobAsync", e); } }, latch)); - latch.await(100, TimeUnit.MILLISECONDS); + latch.await(1000, TimeUnit.MILLISECONDS); assertEquals(true, succeeded.get()); } From 985341a7a4269bbf93b614be897d024461da21ab Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 20 Oct 2022 10:38:48 +0530 Subject: [PATCH 06/13] Fix TranslogTransferManagerTests Signed-off-by: Sachin Kale --- .../index/translog/transfer/TranslogTransferManagerTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index d0f6bd86c032d..60b7029f18fa6 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog.transfer; +import org.apache.lucene.tests.util.LuceneTestCase; import org.mockito.Mockito; import org.opensearch.action.ActionListener; import org.opensearch.common.blobstore.BlobPath; @@ -28,6 +29,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +@LuceneTestCase.SuppressFileSystems("*") public class TranslogTransferManagerTests extends OpenSearchTestCase { private TransferService transferService; @@ -77,6 +79,7 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { }, r -> r ); + assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @Override public void onUploadComplete(TransferSnapshot transferSnapshot) { From 4eec4f6275be6c8dc6c2cf0cc49bcfaad2ef63d4 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 21 Oct 2022 09:53:50 +0530 Subject: [PATCH 07/13] Add missing javadocs Signed-off-by: Sachin Kale --- .../index/translog/transfer/FileSnapshot.java | 15 +++++++++++++++ .../translog/transfer/listener/package-info.java | 10 ++++++++++ .../index/translog/transfer/package-info.java | 10 ++++++++++ 3 files changed, 35 insertions(+) create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/listener/package-info.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/package-info.java diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 87076d7acf601..926c07ba0fed7 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -103,6 +103,11 @@ public void close() throws IOException { IOUtils.close(fileChannel); } + /** + * Snapshot of a single file with primary term that gets transferred + * + * @opensearch.internal + */ public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; @@ -138,6 +143,11 @@ public boolean equals(Object o) { } } + /** + * Snapshot of a single .tlg file that gets transferred + * + * @opensearch.internal + */ public static class TranslogFileSnapshot extends TransferFileSnapshot { private final long generation; @@ -168,6 +178,11 @@ public boolean equals(Object o) { } } + /** + * Snapshot of a single .ckp file that gets transferred + * + * @opensearch.internal + */ public static class CheckpointFileSnapshot extends TransferFileSnapshot { private final long generation; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/package-info.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/package-info.java new file mode 100644 index 0000000000000..edb7f453515b1 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Core classes responsible for handling all translog operations */ +package org.opensearch.index.translog.transfer.listener; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/package-info.java b/server/src/main/java/org/opensearch/index/translog/transfer/package-info.java new file mode 100644 index 0000000000000..2ac96b01b0673 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Core classes responsible for handling all translog operations */ +package org.opensearch.index.translog.transfer; From 6b7df9620c3d93843e7e74e14351420810cbe3b6 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sat, 12 Nov 2022 16:24:33 +0530 Subject: [PATCH 08/13] Modified TransferSnapshotProvider to use Builder Signed-off-by: Bukhtawar Khan --- .../transfer/TransferSnapshotProvider.java | 141 ----------------- .../TranslogCheckpointTransferSnapshot.java | 145 ++++++++++++++++++ 2 files changed, 145 insertions(+), 141 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java deleted file mode 100644 index b9c0f18dce1c4..0000000000000 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.translog.transfer; - -import org.opensearch.common.collect.Tuple; -import org.opensearch.index.translog.TranslogReader; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.LongStream; - -import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; -import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; -import static org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; - -/** - * Provider for a {@link TransferSnapshot} which builds the snapshot from the translog and checkpoint files present on the local-disk - * - * @opensearch.internal - */ -public class TransferSnapshotProvider implements Supplier { - - private final TranslogCheckpointTransferSnapshot translogTransferSnapshot; - - public TransferSnapshotProvider( - long primaryTerm, - long generation, - Path location, - List readers, - Function checkpointGenFileNameMapper - ) throws IOException { - translogTransferSnapshot = new TranslogCheckpointTransferSnapshot(primaryTerm, generation, readers.size()); - for (TranslogReader reader : readers) { - final long readerGeneration = reader.getGeneration(); - final long readerPrimaryTerm = reader.getPrimaryTerm(); - final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration(); - final long checkpointGeneration = reader.getCheckpoint().getGeneration(); - Path translogPath = reader.path(); - Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); - translogTransferSnapshot.add( - new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath), - new CheckpointFileSnapshot(readerPrimaryTerm, checkpointGeneration, minTranslogGeneration, checkpointPath) - ); - } - } - - public TranslogCheckpointTransferSnapshot get() { - translogTransferSnapshot.assertInvariants(); - return translogTransferSnapshot; - } - - static class TranslogCheckpointTransferSnapshot implements TransferSnapshot { - - private final Set> translogCheckpointFileInfoTupleSet; - private final int size; - private final List generations; - private CheckpointFileSnapshot latestCheckPointFileSnapshot; - private TranslogFileSnapshot latestTranslogFileSnapshot; - private final long generation; - private long highestGeneration; - private long lowestGeneration; - private final long primaryTerm; - - TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) { - translogCheckpointFileInfoTupleSet = new HashSet<>(size); - this.size = size; - this.generations = new LinkedList<>(); - this.generation = generation; - this.primaryTerm = primaryTerm; - this.highestGeneration = Long.MIN_VALUE; - this.lowestGeneration = Long.MAX_VALUE; - } - - private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) { - translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot)); - assert translogFileSnapshot.getGeneration() == checkPointFileSnapshot.getGeneration(); - generations.add(translogFileSnapshot.getGeneration()); - if (highestGeneration < translogFileSnapshot.getGeneration()) { - latestCheckPointFileSnapshot = checkPointFileSnapshot; - latestTranslogFileSnapshot = translogFileSnapshot; - highestGeneration = translogFileSnapshot.getGeneration(); - } - this.lowestGeneration = Math.min(lowestGeneration, translogFileSnapshot.getGeneration()); - } - - private void assertInvariants() { - assert this.primaryTerm == latestTranslogFileSnapshot.getPrimaryTerm() : "inconsistent primary term"; - assert this.generation == highestGeneration : "inconsistent generation"; - assert translogCheckpointFileInfoTupleSet.size() == size : "inconsistent translog and checkpoint file count"; - assert highestGeneration <= lowestGeneration : "lowest generation is greater than highest generation"; - assert LongStream.iterate(lowestGeneration, i -> i + 1) - .limit(highestGeneration) - .boxed() - .collect(Collectors.toList()) - .equals(generations.stream().sorted().collect(Collectors.toList())) == true : "generation gaps found"; - } - - @Override - public Set getTranslogFileSnapshots() { - return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet()); - } - - @Override - public TranslogTransferMetadata getTranslogTransferMetadata() { - return new TranslogTransferMetadata( - latestTranslogFileSnapshot.getPrimaryTerm(), - latestTranslogFileSnapshot.getGeneration(), - latestCheckPointFileSnapshot.getMinTranslogGeneration(), - translogCheckpointFileInfoTupleSet.size() * 2 - ); - } - - @Override - public Set getCheckpointFileSnapshots() { - return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v2).collect(Collectors.toSet()); - } - - @Override - public String toString() { - return new StringBuilder("TranslogTransferSnapshot [").append(" primary term = ") - .append(primaryTerm) - .append(", generation = ") - .append(generation) - .append(" ]") - .toString(); - } - } -} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java new file mode 100644 index 0000000000000..dd1ff8a3ac33a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -0,0 +1,145 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.common.collect.Tuple; +import org.opensearch.index.translog.TranslogReader; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; + +/** + * Implementation for a {@link TransferSnapshot} which builds the snapshot from the translog and checkpoint files present on the local-disk + * + * @opensearch.internal + */ +public class TranslogCheckpointTransferSnapshot implements TransferSnapshot { + + private final Set> translogCheckpointFileInfoTupleSet; + private final int size; + private final long generation; + private final long primaryTerm; + private long minTranslogGeneration; + + TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) { + translogCheckpointFileInfoTupleSet = new HashSet<>(size); + this.size = size; + this.generation = generation; + this.primaryTerm = primaryTerm; + } + + private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) { + translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot)); + assert translogFileSnapshot.getGeneration() == checkPointFileSnapshot.getGeneration(); + } + + private void setMinTranslogGeneration(long minTranslogGeneration) { + this.minTranslogGeneration = minTranslogGeneration; + } + + @Override + public Set getTranslogFileSnapshots() { + return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet()); + } + + @Override + public TranslogTransferMetadata getTranslogTransferMetadata() { + return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, translogCheckpointFileInfoTupleSet.size() * 2); + } + + @Override + public Set getCheckpointFileSnapshots() { + return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v2).collect(Collectors.toSet()); + } + + @Override + public String toString() { + return new StringBuilder("TranslogTransferSnapshot [").append(" primary term = ") + .append(primaryTerm) + .append(", generation = ") + .append(generation) + .append(" ]") + .toString(); + } + + public static class Builder { + private final long primaryTerm; + private final long generation; + private final List readers; + private final Function checkpointGenFileNameMapper; + private final Path location; + + public Builder( + long primaryTerm, + long generation, + Path location, + List readers, + Function checkpointGenFileNameMapper + ) { + this.primaryTerm = primaryTerm; + this.generation = generation; + this.readers = readers; + this.checkpointGenFileNameMapper = checkpointGenFileNameMapper; + this.location = location; + } + + public TranslogCheckpointTransferSnapshot build() throws IOException { + final List generations = new LinkedList<>(); + long highestGeneration = Long.MIN_VALUE; + long highestGenPrimaryTerm = Long.MIN_VALUE; + long lowestGeneration = Long.MAX_VALUE; + long highestGenMinTranslogGeneration = Long.MIN_VALUE; + TranslogCheckpointTransferSnapshot translogTransferSnapshot = new TranslogCheckpointTransferSnapshot( + primaryTerm, + generation, + readers.size() + ); + for (TranslogReader reader : readers) { + final long readerGeneration = reader.getGeneration(); + final long readerPrimaryTerm = reader.getPrimaryTerm(); + final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration(); + final long checkpointGeneration = reader.getCheckpoint().getGeneration(); + Path translogPath = reader.path(); + Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); + generations.add(readerGeneration); + translogTransferSnapshot.add( + new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath), + new CheckpointFileSnapshot(readerPrimaryTerm, checkpointGeneration, minTranslogGeneration, checkpointPath) + ); + if (readerGeneration > highestGeneration) { + highestGeneration = readerGeneration; + highestGenMinTranslogGeneration = minTranslogGeneration; + highestGenPrimaryTerm = readerPrimaryTerm; + } + lowestGeneration = Math.min(lowestGeneration, readerGeneration); + } + translogTransferSnapshot.setMinTranslogGeneration(highestGenMinTranslogGeneration); + + assert this.primaryTerm == highestGenPrimaryTerm : "inconsistent primary term"; + assert this.generation == highestGeneration : "inconsistent generation"; + assert LongStream.iterate(lowestGeneration, i -> i + 1) + .limit(highestGeneration) + .boxed() + .collect(Collectors.toList()) + .equals(generations.stream().sorted().collect(Collectors.toList())) == true : "generation gaps found"; + return translogTransferSnapshot; + } + } +} From 0d834be4989b5f265b41d8069cf84383adfab8ba Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 15 Nov 2022 17:07:35 +0530 Subject: [PATCH 09/13] Added Java doc Signed-off-by: Bukhtawar Khan --- .../translog/transfer/TranslogCheckpointTransferSnapshot.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index dd1ff8a3ac33a..30b81627614b7 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -79,6 +79,9 @@ public String toString() { .toString(); } + /** + * Builder for {@link TranslogCheckpointTransferSnapshot} + */ public static class Builder { private final long primaryTerm; private final long generation; From 09ab574d54693e2905a954b3777b83440f93a678 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 15 Nov 2022 22:37:28 +0530 Subject: [PATCH 10/13] Added Java doc Signed-off-by: Bukhtawar Khan --- .../opensearch/index/translog/transfer/TransferService.java | 2 +- .../translog/transfer/listener/TranslogTransferListener.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 60db56dee6eb6..ed6c185352833 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -36,7 +36,7 @@ void uploadBlobAsync( * Uploads the {@link TransferFileSnapshot} blob * @param fileSnapshot the file snapshot to upload * @param remotePath the remote path where upload should be made - * @throws IOException + * @throws IOException the exception while transferring the data */ void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java index 9a9dbddf467c0..c09fd8798e505 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -22,7 +22,7 @@ public interface TranslogTransferListener { /** * Invoked when the transfer of {@link TransferSnapshot} succeeds * @param transferSnapshot the transfer snapshot - * @throws IOException + * @throws IOException the exception during the transfer of data */ void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException; @@ -30,7 +30,7 @@ public interface TranslogTransferListener { * Invoked when the transfer of {@link TransferSnapshot} fails * @param transferSnapshot the transfer snapshot * @param ex the exception while processing the {@link TransferSnapshot} - * @throws IOException + * @throws IOException the exception during the transfer of data */ void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException; } From 0bc8a300191df9d024009b0cf7e635eb6c92df1d Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 15 Nov 2022 23:50:27 +0530 Subject: [PATCH 11/13] Handle exception of file transfer Signed-off-by: Bukhtawar Khan --- .../transfer/TranslogTransferManager.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 057956b0dbc57..f557389b68448 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -99,18 +99,24 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans throw ex; } } catch (InterruptedException ex) { - logger.error(() -> new ParameterizedMessage("Time failed for snapshot {}", transferSnapshot), ex); exceptionList.forEach(ex::addSuppressed); + logger.error(() -> new ParameterizedMessage("Time failed for snapshot {}", transferSnapshot), ex); Thread.currentThread().interrupt(); throw ex; } - final TransferFileSnapshot transferFileSnapshot = prepareMetadata(transferSnapshot); - transferService.uploadBlob( - prepareMetadata(transferSnapshot), - remoteBaseTransferPath.add(String.valueOf(transferFileSnapshot.getPrimaryTerm())) - ); - translogTransferListener.onUploadComplete(transferSnapshot); - return true; + if (exceptionList.isEmpty()) { + final TransferFileSnapshot transferFileSnapshot = prepareMetadata(transferSnapshot); + transferService.uploadBlob( + prepareMetadata(transferSnapshot), + remoteBaseTransferPath.add(String.valueOf(transferFileSnapshot.getPrimaryTerm())) + ); + translogTransferListener.onUploadComplete(transferSnapshot); + return true; + } else { + Exception ex = new RuntimeException("Failed to upload some files during transfer"); + exceptionList.forEach(ex::addSuppressed); + throw ex; + } } catch (Exception ex) { logger.error(() -> new ParameterizedMessage("Transfer failed for snapshot {}", transferSnapshot), ex); translogTransferListener.onUploadFailed(transferSnapshot, ex); From 8a5da201bad25b1757e0fac06563dbfdd864f308 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sat, 19 Nov 2022 14:04:57 +0530 Subject: [PATCH 12/13] PR review comments Signed-off-by: Bukhtawar Khan --- .../index/translog/transfer/BlobStoreTransferService.java | 1 - .../opensearch/index/translog/transfer/FileSnapshot.java | 8 ++++---- .../index/translog/transfer/FileTransferException.java | 5 ----- .../index/translog/transfer/TranslogTransferManager.java | 1 - .../translog/transfer/BlobStoreTransferServiceTests.java | 4 ++-- 5 files changed, 6 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 0aede52939a1a..36d9d71217837 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -65,7 +65,6 @@ public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable try (InputStream inputStream = fileSnapshot.inputStream()) { blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); } catch (Exception ex) { - logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); throw ex; } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 926c07ba0fed7..e8c06e3d251c7 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -38,14 +38,14 @@ public class FileSnapshot implements Closeable { @Nullable private byte[] content; - public FileSnapshot(Path path) throws IOException { + private FileSnapshot(Path path) throws IOException { Objects.requireNonNull(path); this.name = path.getFileName().toString(); this.path = path; this.fileChannel = FileChannel.open(path, StandardOpenOption.READ); } - public FileSnapshot(String name, byte[] content) throws IOException { + private FileSnapshot(String name, byte[] content) { Objects.requireNonNull(name); this.name = name; this.content = content; @@ -148,7 +148,7 @@ public boolean equals(Object o) { * * @opensearch.internal */ - public static class TranslogFileSnapshot extends TransferFileSnapshot { + public static final class TranslogFileSnapshot extends TransferFileSnapshot { private final long generation; @@ -183,7 +183,7 @@ public boolean equals(Object o) { * * @opensearch.internal */ - public static class CheckpointFileSnapshot extends TransferFileSnapshot { + public static final class CheckpointFileSnapshot extends TransferFileSnapshot { private final long generation; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java index 7b30be7ca639a..89a4135d2409b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java @@ -24,11 +24,6 @@ public FileTransferException(TransferFileSnapshot fileSnapshot, Throwable cause) this.fileSnapshot = fileSnapshot; } - public FileTransferException(TransferFileSnapshot fileSnapshot, String message, Throwable cause) { - super(message, cause); - this.fileSnapshot = fileSnapshot; - } - public TransferFileSnapshot getFileSnapshot() { return fileSnapshot; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index f557389b68448..02ebab8ed6826 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -100,7 +100,6 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans } } catch (InterruptedException ex) { exceptionList.forEach(ex::addSuppressed); - logger.error(() -> new ParameterizedMessage("Time failed for snapshot {}", transferSnapshot), ex); Thread.currentThread().interrupt(); throw ex; } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index cf69f13e2a89e..adca47bf64c64 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -74,8 +74,8 @@ public void onFailure(Exception e) { throw new AssertionError("Failed to perform uploadBlobAsync", e); } }, latch)); - latch.await(1000, TimeUnit.MILLISECONDS); - assertEquals(true, succeeded.get()); + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(succeeded.get()); } @Override From cc4d85dd2e9c22aa56e89dcc0754bb1412e4ea54 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Mon, 21 Nov 2022 18:42:51 +0530 Subject: [PATCH 13/13] Update changelog Signed-off-by: Bukhtawar Khan --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0d81f455fe28..9b8e65c548712 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,7 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Relax visibility of the HTTP_CHANNEL_KEY and HTTP_SERVER_CHANNEL_KEY to make it possible for the plugins to access associated Netty4HttpChannel / Netty4HttpServerChannel instance ([#4638](https://github.com/opensearch-project/OpenSearch/pull/4638)) - Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725)) - Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459)) -- Introduce remote translog transfer support([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480)) +- Support remote translog transfer for request level durability([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480)) ### Deprecated