diff --git a/CHANGELOG.md b/CHANGELOG.md index a10824a56af05..5f39a64534057 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156)) - Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253)) - [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402)) +- Introduce remote translog transfer support([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 86e26d56555a1..014e94673b477 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -39,7 +39,7 @@ public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorSer } @Override - public void uploadFileAsync( + public void uploadBlobAsync( final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath, ActionListener listener @@ -64,7 +64,7 @@ public void uploadFileAsync( } @Override - public void uploadFile(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { + public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; try { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 4658ace529cea..7f491470f5d49 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -10,8 +10,13 @@ import org.opensearch.common.Nullable; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Objects; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; /** * Snapshot of a single file that gets transferred @@ -27,19 +32,26 @@ public class FileSnapshot { @Nullable private Path path; - public FileSnapshot(String name, Path path, long checksum, byte[] content) { - this.name = name; + public FileSnapshot(Path path) throws IOException { + Objects.requireNonNull(path); + this.name = path.getFileName().toString(); this.path = path; - this.checksum = checksum; - this.content = content; - this.contentLength = content.length; + try (CheckedInputStream stream = new CheckedInputStream(Files.newInputStream(path), new CRC32())) { + this.content = stream.readAllBytes(); + this.checksum = stream.getChecksum().getValue(); + this.contentLength = content.length; + } } - public FileSnapshot(String name, long checksum, byte[] content) { + public FileSnapshot(String name, byte[] content) throws IOException { + Objects.requireNonNull(content); + Objects.requireNonNull(name); this.name = name; - this.checksum = checksum; - this.content = content; - this.contentLength = content.length; + try (CheckedInputStream stream = new CheckedInputStream(new ByteArrayInputStream(content), new CRC32())) { + this.content = stream.readAllBytes(); + this.checksum = stream.getChecksum().getValue(); + this.contentLength = content.length; + } } public Path getPath() { @@ -80,20 +92,29 @@ public boolean equals(Object o) { @Override public String toString() { - return new StringBuilder("FileInfo [").append(name).append(path.toUri()).append(checksum).append(contentLength).toString(); + return new StringBuilder("FileInfo [").append(" name = ") + .append(name) + .append(", path = ") + .append(path.toUri()) + .append(", checksum = ") + .append(checksum) + .append(", contentLength = ") + .append(contentLength) + .append("]") + .toString(); } public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; - public TransferFileSnapshot(String name, Path path, long checksum, byte[] content, long primaryTerm) { - super(name, path, checksum, content); + public TransferFileSnapshot(Path path, long primaryTerm) throws IOException { + super(path); this.primaryTerm = primaryTerm; } - public TransferFileSnapshot(String name, long checksum, byte[] content, long primaryTerm) { - super(name, checksum, content); + public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException { + super(name, content); this.primaryTerm = primaryTerm; } @@ -122,8 +143,8 @@ public static class TranslogFileSnapshot extends TransferFileSnapshot { private final long generation; - public TranslogFileSnapshot(long primaryTerm, long generation, String name, Path path, long checksum, byte[] content) { - super(name, path, checksum, content, primaryTerm); + public TranslogFileSnapshot(long primaryTerm, long generation, Path path) throws IOException { + super(path, primaryTerm); this.generation = generation; } @@ -152,8 +173,8 @@ public static class CheckpointFileSnapshot extends TransferFileSnapshot { private final long minTranslogGeneration; - public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, String name, Path path, long checksum, byte[] content) { - super(name, path, checksum, content, primaryTerm); + public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, Path path) throws IOException { + super(path, primaryTerm); this.minTranslogGeneration = minTranslogGeneration; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 29382a0318aba..39c909fdf8f99 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -20,12 +20,12 @@ */ public interface TransferService { - void uploadFileAsync( + void uploadBlobAsync( final TransferFileSnapshot fileSnapshot, Iterable remotePath, ActionListener listener ); - void uploadFile(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; + void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java index 373cb732cf0d3..7cd188f056a54 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java @@ -11,8 +11,6 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.index.translog.TranslogReader; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Path; import java.util.HashSet; @@ -21,8 +19,6 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; @@ -42,7 +38,7 @@ public TransferSnapshotProvider( long generation, Path location, List readers, - Function checkpointFileNameMapper + Function checkpointGenFileNameMapper ) throws IOException { translogTransferSnapshot = new TranslogCheckpointTransferSnapshot(primaryTerm, generation, readers.size()); for (TranslogReader reader : readers) { @@ -50,10 +46,10 @@ public TransferSnapshotProvider( final long readerPrimaryTerm = reader.getPrimaryTerm(); final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration(); Path translogPath = reader.path(); - Path checkpointPath = location.resolve(checkpointFileNameMapper.apply(readerGeneration)); + Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); translogTransferSnapshot.add( - buildTranslogFileInfo(translogPath.toFile(), readerPrimaryTerm, readerGeneration), - buildCheckpointFileInfo(checkpointPath.toFile(), readerPrimaryTerm, minTranslogGeneration) + new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath), + new CheckpointFileSnapshot(readerPrimaryTerm, minTranslogGeneration, checkpointPath) ); } } @@ -63,26 +59,6 @@ public TranslogCheckpointTransferSnapshot get() { return translogTransferSnapshot; } - private TranslogFileSnapshot buildTranslogFileInfo(File file, long primaryTerm, long generation) throws IOException { - TranslogFileSnapshot fileSnapshot; - try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(file), new CRC32())) { - byte[] content = stream.readAllBytes(); - long checksum = stream.getChecksum().getValue(); - fileSnapshot = new TranslogFileSnapshot(primaryTerm, generation, file.getName(), file.toPath(), checksum, content); - } - return fileSnapshot; - } - - private CheckpointFileSnapshot buildCheckpointFileInfo(File file, long primaryTerm, long minTranslogGeneration) throws IOException { - CheckpointFileSnapshot fileSnapshot; - try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(file), new CRC32())) { - byte[] content = stream.readAllBytes(); - long checksum = stream.getChecksum().getValue(); - fileSnapshot = new CheckpointFileSnapshot(primaryTerm, minTranslogGeneration, file.getName(), file.toPath(), checksum, content); - } - return fileSnapshot; - } - static class TranslogCheckpointTransferSnapshot implements TransferSnapshot { private final Set> translogCheckpointFileInfoTupleSet; @@ -115,6 +91,7 @@ private void assertInvariants() { assert translogCheckpointFileInfoTupleSet.size() == size : "inconsistent translog and checkpoint file count"; } + @Override public Set getTranslogFileSnapshots() { return translogCheckpointFileInfoTupleSet.stream().map(tuple -> tuple.v1()).collect(Collectors.toSet()); } @@ -129,8 +106,19 @@ public TranslogTransferMetadata getTranslogTransferMetadata() { ); } + @Override public Set getCheckpointFileSnapshots() { return translogCheckpointFileInfoTupleSet.stream().map(tuple -> tuple.v2()).collect(Collectors.toSet()); } + + @Override + public String toString() { + return new StringBuilder("TranslogTransferSnapshot [").append(" primary term = ") + .append(primaryTerm) + .append(", generation = ") + .append(generation) + .append(" ]") + .toString(); + } } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 9de1a3bd5e187..3c1e213dfa545 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -14,10 +14,10 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.index.translog.transfer.listener.FileTransferListener; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -29,8 +29,6 @@ import java.util.concurrent.TimeoutException; import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; @@ -44,9 +42,9 @@ public class TranslogTransferManager { private final TransferService transferService; private final BlobPath remoteBaseTransferPath; - private final BlobPath remoteTransferMetadataPath; private final FileTransferListener fileTransferListener; private final UnaryOperator> exclusionFilter; + private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class); @@ -54,31 +52,32 @@ public class TranslogTransferManager { public TranslogTransferManager( TransferService transferService, BlobPath remoteBaseTransferPath, - BlobPath remoteTransferMetadataPath, FileTransferListener fileTransferListener, UnaryOperator> exclusionFilter ) { this.transferService = transferService; this.remoteBaseTransferPath = remoteBaseTransferPath; - this.remoteTransferMetadataPath = remoteTransferMetadataPath; this.fileTransferListener = fileTransferListener; this.exclusionFilter = exclusionFilter; } - public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) throws IOException { + public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) + throws IOException { List exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount()); try { Set toUpload = exclusionFilter.apply(transferSnapshot.getTranslogFileSnapshots()); toUpload.addAll(exclusionFilter.apply(transferSnapshot.getCheckpointFileSnapshots())); - if (toUpload.isEmpty()) { - logger.warn("Nothing to upload for transfer size {}", transferSnapshot.getTranslogTransferMetadata().getCount()); - return true; - } final CountDownLatch latch = new CountDownLatch(toUpload.size()); LatchedActionListener latchedActionListener = new LatchedActionListener( ActionListener.wrap(fileTransferListener::onSuccess, ex -> { assert ex instanceof FileTransferException; - logger.error("Exception received type {}", ex.getClass(), ex); + logger.error( + () -> new ParameterizedMessage( + "Exception during transfer for file {}", + ((FileTransferException) ex).getFileSnapshot().getName() + ), + ex + ); FileTransferException e = (FileTransferException) ex; fileTransferListener.onFailure(e.getFileSnapshot(), ex); exceptionList.add(ex); @@ -86,7 +85,7 @@ public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransfe latch ); toUpload.forEach( - fileSnapshot -> transferService.uploadFileAsync( + fileSnapshot -> transferService.uploadBlobAsync( fileSnapshot, remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), latchedActionListener @@ -94,9 +93,7 @@ public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransfe ); try { if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { - Exception ex = new TimeoutException( - "Timed out waiting for transfer of generation " + transferSnapshot + " to complete" - ); + Exception ex = new TimeoutException("Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"); exceptionList.forEach(e -> ex.addSuppressed(e)); throw ex; } @@ -106,7 +103,11 @@ public boolean uploadTranslog(TransferSnapshot transferSnapshot, TranslogTransfe Thread.currentThread().interrupt(); throw ex; } - transferService.uploadFile(prepareMetadata(transferSnapshot), remoteTransferMetadataPath); + final TransferFileSnapshot transferFileSnapshot = prepareMetadata(transferSnapshot); + transferService.uploadBlob( + prepareMetadata(transferSnapshot), + remoteBaseTransferPath.add(String.valueOf(transferFileSnapshot.getPrimaryTerm())) + ); translogTransferListener.onUploadComplete(transferSnapshot); return true; } catch (Exception ex) { @@ -133,16 +134,12 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) TransferFileSnapshot fileSnapshot; try (BytesStreamOutput output = new BytesStreamOutput()) { translogTransferMetadata.writeTo(output); - try ( - CheckedInputStream stream = new CheckedInputStream( - new ByteArrayInputStream(output.bytes().streamInput().readByteArray()), - new CRC32() - ) - ) { - byte[] content = stream.readAllBytes(); - long checksum = stream.getChecksum().getValue(); - fileSnapshot = new TransferFileSnapshot(translogTransferMetadata.getMetadataFileName(), checksum, content, -1); - } + fileSnapshot = new TransferFileSnapshot( + translogTransferMetadata.getMetadataFileName(), + BytesReference.toBytes(output.bytes()), + -1 + ); + } return fileSnapshot; } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java new file mode 100644 index 0000000000000..01942fe9e96a4 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.Environment; +import org.opensearch.env.TestEnvironment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.BlobStoreTestUtil; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class BlobStoreTransferServiceTests extends OpenSearchTestCase { + + private ExecutorService executorService; + + private BlobStoreRepository repository; + + @Override + public void setUp() throws Exception { + super.setUp(); + repository = createRepository(); + executorService = Executors.newFixedThreadPool(1); + } + + public void testUploadBlob() throws IOException { + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + createTempFile(), + randomNonNegativeLong() + ); + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); + transferService.uploadBlob(transferFileSnapshot, repository.basePath()); + } + + public void testUploadBlobAsync() throws IOException, InterruptedException { + AtomicBoolean succeeded = new AtomicBoolean(false); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + createTempFile(), + randomNonNegativeLong() + ); + CountDownLatch latch = new CountDownLatch(1); + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); + transferService.uploadBlobAsync(transferFileSnapshot, repository.basePath(), new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + assert succeeded.compareAndSet(false, true); + assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshot.getPrimaryTerm()); + assertEquals(transferFileSnapshot.getChecksum(), fileSnapshot.getChecksum()); + assertEquals(transferFileSnapshot.getName(), fileSnapshot.getName()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Failed to perform uploadBlobAsync", e); + } + }, latch)); + latch.await(100, TimeUnit.MILLISECONDS); + assertEquals(true, succeeded.get()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + repository.stop(); + executorService.shutdown(); + executorService.awaitTermination(1000, TimeUnit.MILLISECONDS); + } + + /** Create a {@link Repository} with a random name **/ + private BlobStoreRepository createRepository() { + Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final FsRepository repository = new FsRepository( + repositoryMetadata, + createEnvironment(), + xContentRegistry(), + clusterService, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo manually + } + }; + clusterService.addStateApplier(event -> repository.updateState(event.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); + repository.start(); + return repository; + } + + /** Create a {@link Environment} with random path.home and path.repo **/ + private Environment createEnvironment() { + Path home = createTempDir(); + return TestEnvironment.newEnvironment( + Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath()) + .build() + ); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java new file mode 100644 index 0000000000000..7bee9bb008bf7 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.test.OpenSearchTestCase; + +public class TranslogTransferManagerTests extends OpenSearchTestCase {}