From 05af414bb0a4bfe8c3fccd67bb2dd45694a92fe0 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Date: Sat, 8 Jun 2024 17:22:26 +0530 Subject: [PATCH] Upload blob from input stream (#13836) * Blobstore transfer of cluster metadata from the underlying input stream Signed-off-by: Sooraj Sinha --- .../model/RemoteClusterStateBlobStore.java | 12 +- .../transfer/BlobStoreTransferService.java | 124 +++++++++++++---- .../translog/transfer/TransferService.java | 17 +++ .../BlobStoreTransferServiceTests.java | 131 ++++++++++++++++++ 4 files changed, 256 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java index 1aeecc4e70382..83326f65f0d43 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java @@ -9,6 +9,7 @@ package org.opensearch.gateway.remote.model; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.common.remote.RemoteWriteableEntity; @@ -54,15 +55,20 @@ public void writeAsync(final U entity, final ActionListener listener) { try (InputStream inputStream = entity.serialize()) { BlobPath blobPath = getBlobPathForUpload(entity); entity.setFullBlobName(blobPath); - // TODO uncomment below logic after merging PR https://github.com/opensearch-project/OpenSearch/pull/13836 - // transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT, - // listener); + transferService.uploadBlob( + inputStream, + getBlobPathForUpload(entity), + entity.getBlobFileName(), + WritePriority.URGENT, + listener + ); } } catch (Exception e) { listener.onFailure(e); } } + @Override public T read(final U entity) throws IOException { // TODO Add timing logs and tracing assert entity.getFullBlobName() != null; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 704f6419da60a..d55abb40dec48 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.IndexInput; import org.opensearch.action.ActionRunnable; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; @@ -19,11 +20,13 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.InputStreamWithMetadata; -import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.action.ActionListener; +import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.index.translog.ChannelFactory; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.threadpool.ThreadPool; @@ -41,6 +44,7 @@ import java.util.Set; import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC; +import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum; import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY; /** @@ -53,6 +57,7 @@ public class BlobStoreTransferService implements TransferService { private final BlobStore blobStore; private final ThreadPool threadPool; + private static final int CHECKSUM_BYTES_LENGTH = 8; private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class); public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) { @@ -108,6 +113,40 @@ public void uploadBlobs( } + @Override + public void uploadBlob( + InputStream inputStream, + Iterable remotePath, + String fileName, + WritePriority writePriority, + ActionListener listener + ) throws IOException { + assert remotePath instanceof BlobPath; + BlobPath blobPath = (BlobPath) remotePath; + final BlobContainer blobContainer = blobStore.blobContainer(blobPath); + if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { + blobContainer.writeBlob(fileName, inputStream, inputStream.available(), false); + listener.onResponse(null); + return; + } + final String resourceDescription = "BlobStoreTransferService.uploadBlob(blob=\"" + fileName + "\")"; + byte[] bytes = inputStream.readAllBytes(); + try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) { + long expectedChecksum = computeChecksum(input, resourceDescription); + uploadBlobAsyncInternal( + fileName, + fileName, + bytes.length, + blobPath, + writePriority, + (size, position) -> new OffsetRangeIndexInputStream(input, size, position), + expectedChecksum, + listener, + null + ); + } + } + // Builds a metadata map containing the Base64-encoded checkpoint file data associated with a translog file. static Map buildTransferFileMetadata(InputStream metadataInputStream) throws IOException { Map metadata = new HashMap<>(); @@ -150,37 +189,23 @@ private void uploadBlob( try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) { contentLength = channel.size(); } - boolean remoteIntegrityEnabled = false; - BlobContainer blobContainer = blobStore.blobContainer(blobPath); - if (blobContainer instanceof AsyncMultiStreamBlobContainer) { - remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported(); - } - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + ActionListener completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); + listener.onFailure(new FileTransferException(fileSnapshot, ex)); + }); + + Objects.requireNonNull(fileSnapshot.getChecksum()); + uploadBlobAsyncInternal( fileSnapshot.getName(), fileSnapshot.getName(), contentLength, - true, + blobPath, writePriority, (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position), - Objects.requireNonNull(fileSnapshot.getChecksum()), - remoteIntegrityEnabled, + fileSnapshot.getChecksum(), + completionListener, metadata ); - ActionListener completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> { - logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); - listener.onFailure(new FileTransferException(fileSnapshot, ex)); - }); - - completionListener = ActionListener.runBefore(completionListener, () -> { - try { - remoteTransferContainer.close(); - } catch (Exception e) { - logger.warn("Error occurred while closing streams", e); - } - }); - - WriteContext writeContext = remoteTransferContainer.createWriteContext(); - ((AsyncMultiStreamBlobContainer) blobStore.blobContainer(blobPath)).asyncBlobUpload(writeContext, completionListener); } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); @@ -195,6 +220,40 @@ private void uploadBlob( } + private void uploadBlobAsyncInternal( + String fileName, + String remoteFileName, + long contentLength, + BlobPath blobPath, + WritePriority writePriority, + RemoteTransferContainer.OffsetRangeInputStreamSupplier inputStreamSupplier, + long expectedChecksum, + ActionListener completionListener, + Map metadata + ) throws IOException { + BlobContainer blobContainer = blobStore.blobContainer(blobPath); + assert blobContainer instanceof AsyncMultiStreamBlobContainer; + boolean remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported(); + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + fileName, + remoteFileName, + contentLength, + true, + writePriority, + inputStreamSupplier, + expectedChecksum, + remoteIntegrityEnabled, + metadata + ) + ) { + ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload( + remoteTransferContainer.createWriteContext(), + completionListener + ); + } + } + @Override public InputStream downloadBlob(Iterable path, String fileName) throws IOException { return blobStore.blobContainer((BlobPath) path).readBlob(fileName); @@ -276,4 +335,19 @@ public void listAllInSortedOrderAsync( threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, filenamePrefix, limit, listener); }); } + private static long computeChecksum(IndexInput indexInput, String resourceDescription) throws ChecksumCombinationException { + long expectedChecksum; + try { + expectedChecksum = checksumOfChecksum(indexInput.clone(), CHECKSUM_BYTES_LENGTH); + } catch (Exception e) { + throw new ChecksumCombinationException( + "Potentially corrupted file: Checksum combination failed while combining stored checksum " + + "and calculated checksum of stored checksum", + resourceDescription, + e + ); + } + return expectedChecksum; + } + } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 0ff983739438b..1182c626fb0e9 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -66,6 +66,23 @@ void uploadBlobs( */ void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath, WritePriority writePriority) throws IOException; + /** + * Reads the input stream and uploads as a blob + * @param inputStream the stream to read from + * @param remotePath the remote path where upload should be made + * @param blobName the name of blob file + * @param writePriority Priority by which content needs to be written. + * @param listener the callback to be invoked once uploads complete successfully/fail + * @throws IOException the exception thrown while uploading + */ + void uploadBlob( + InputStream inputStream, + Iterable remotePath, + String blobName, + WritePriority writePriority, + ActionListener listener + ) throws IOException; + void deleteBlobs(Iterable path, List fileNames) throws IOException; /** diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index 3ed90c0837663..cd78aead80923 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -8,19 +8,33 @@ package org.opensearch.index.translog.transfer; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobContainer; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.blobstore.stream.read.ReadContext; +import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.index.Index; +import org.opensearch.core.xcontent.ToXContent; import org.opensearch.env.Environment; import org.opensearch.env.TestEnvironment; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.BlobStoreTestUtil; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -29,6 +43,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -40,6 +55,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class BlobStoreTransferServiceTests extends OpenSearchTestCase { @@ -110,6 +128,70 @@ public void onFailure(Exception e) { assertTrue(succeeded.get()); } + public void testUploadBlobFromInputStreamSyncFSRepo() throws IOException, InterruptedException { + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); + uploadBlobFromInputStream(transferService); + } + + public void testUploadBlobFromInputStreamAsyncFSRepo() throws IOException, InterruptedException { + BlobStore blobStore = createTestBlobStore(); + MockAsyncFsContainer mockAsyncFsContainer = new MockAsyncFsContainer((FsBlobStore) blobStore, BlobPath.cleanPath(), null); + FsBlobStore fsBlobStore = mock(FsBlobStore.class); + when(fsBlobStore.blobContainer(any())).thenReturn(mockAsyncFsContainer); + + TransferService transferService = new BlobStoreTransferService(fsBlobStore, threadPool); + uploadBlobFromInputStream(transferService); + } + + private IndexMetadata getIndexMetadata() { + final Index index = new Index("test-index", "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + return new IndexMetadata.Builder(index.getName()).settings(idxSettings).version(5L).numberOfShards(1).numberOfReplicas(0).build(); + } + + private void uploadBlobFromInputStream(TransferService transferService) throws IOException, InterruptedException { + TestClass testObject = new TestClass("field1", "value1"); + AtomicBoolean succeeded = new AtomicBoolean(false); + ChecksumBlobStoreFormat blobStoreFormat = new ChecksumBlobStoreFormat<>( + "coordination", + "%s", + IndexMetadata::fromXContent + ); + IndexMetadata indexMetadata = getIndexMetadata(); + try ( + InputStream inputStream = blobStoreFormat.serialize( + indexMetadata, + "index-metadata", + new NoneCompressor(), + new ToXContent.MapParams(Map.of()) + ).streamInput() + ) { + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(TestClass testObject) { + assert succeeded.compareAndSet(false, true); + assert testObject.name.equals("field1"); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Failed to perform uploadBlobAsync", e); + } + }, latch); + ActionListener completionListener = ActionListener.wrap( + resp -> listener.onResponse(testObject), + ex -> listener.onFailure(ex) + ); + transferService.uploadBlob(inputStream, repository.basePath(), "test-object", WritePriority.URGENT, completionListener); + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(succeeded.get()); + } + } + @Override public void tearDown() throws Exception { super.tearDown(); @@ -141,6 +223,10 @@ protected void assertSnapshotOrGenericThread() { return repository; } + private BlobStore createTestBlobStore() throws IOException { + return new FsBlobStore(randomIntBetween(1, 8) * 1024, createTempDir(), false); + } + /** Create a {@link Environment} with random path.home and path.repo **/ private Environment createEnvironment() { Path home = createTempDir(); @@ -184,4 +270,49 @@ public void testBuildTransferFileMetadata_SmallInputStreamOptimization() throws assertEquals(expectedBase64String, metadata.get(CHECKPOINT_FILE_DATA_KEY)); } + private static class TestClass implements Serializable { + private TestClass(String name, String value) { + this.name = name; + this.value = value; + } + + private final String name; + private final String value; + + @Override + public String toString() { + return "TestClass{ name: " + name + ", value: " + value + " }"; + } + } + + private static class MockAsyncFsContainer extends FsBlobContainer implements AsyncMultiStreamBlobContainer { + + private BlobContainer delegate; + + public MockAsyncFsContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) { + super(blobStore, blobPath, path); + delegate = blobStore.blobContainer(BlobPath.cleanPath()); + } + + @Override + public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException { + InputStream inputStream = writeContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream(); + delegate.writeBlob(writeContext.getFileName(), inputStream, writeContext.getFileSize(), true); + completionListener.onResponse(null); + } + + @Override + public void readBlobAsync(String blobName, ActionListener listener) { + throw new RuntimeException("read not supported"); + } + + @Override + public boolean remoteIntegrityCheckSupported() { + return false; + } + + public BlobContainer getDelegate() { + return delegate; + } + } }