From b77b32e0d21be2a5dd0f8b1fb04a20e9c2ac3162 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 17 Dec 2020 11:28:10 +0100 Subject: [PATCH 1/2] Add Ability to Write a BytesReference to BlobContainer Except when writing actual segment files to the blob store we always write `BytesReference` instead of a stream. Only having the stream API available forces needless copies on us. I fixed the straight-forward needless copying for HDFS and FS repos in this PR, we could do similar fixes for GCS and Azure as well and thus significantly reduce the peak memory use of these writes on master nodes in particular. --- .../blobstore/url/URLBlobContainer.java | 3 +- .../azure/AzureBlobStoreRepositoryTests.java | 9 ++---- .../azure/AzureBlobContainer.java | 5 +-- .../gcs/GoogleCloudStorageBlobContainer.java | 5 +-- .../repositories/hdfs/HdfsBlobContainer.java | 28 ++++++++++++++-- .../s3/S3BlobStoreRepositoryTests.java | 8 ++--- .../repositories/s3/S3BlobContainer.java | 5 +-- .../BlobStoreRepositoryCleanupIT.java | 6 ++-- .../common/blobstore/BlobContainer.java | 32 +++++++++++++------ .../common/blobstore/fs/FsBlobContainer.java | 27 ++++++++++++++-- .../support/FilterBlobContainer.java | 5 +-- .../blobstore/BlobStoreRepository.java | 17 +++------- .../blobstore/ChecksumBlobStoreFormat.java | 3 +- .../snapshots/BlobStoreFormatTests.java | 6 +--- .../MockEventuallyConsistentRepository.java | 7 ++-- .../ESBlobStoreRepositoryIntegTestCase.java | 13 +++----- .../snapshots/mockstore/MockRepository.java | 7 ++-- .../index/store/cache/TestUtils.java | 2 +- 18 files changed, 116 insertions(+), 72 deletions(-) diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index d7db3960e67ad..abe66e6cc7cf8 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.bytes.BytesReference; import java.io.BufferedInputStream; import java.io.FileNotFoundException; @@ -132,7 +133,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { throw new UnsupportedOperationException("URL repository doesn't support this operation"); } diff --git a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 340123a9108af..fec82f8598de3 100644 --- a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; @@ -36,9 +37,7 @@ import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Collection; @@ -221,10 +220,8 @@ public void testLargeBlobCountDeletion() throws Exception { final BlobContainer container = store.blobContainer(new BlobPath()); for (int i = 0; i < numberOfBlobs; i++) { byte[] bytes = randomBytes(randomInt(100)); - try (InputStream inputStream = new ByteArrayInputStream(bytes)) { - String blobName = randomAlphaOfLength(10); - container.writeBlob(blobName, inputStream, bytes.length, false); - } + String blobName = randomAlphaOfLength(10); + container.writeBlob(blobName, new BytesArray(bytes), false); } container.delete(); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 734ad8bc8c205..870df48c35edb 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; import java.io.InputStream; @@ -101,8 +102,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, bytes, failIfAlreadyExists); } @Override diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index dd225d2c1ccba..452dbda55648b 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; import java.io.InputStream; @@ -83,8 +84,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, bytes, failIfAlreadyExists); } @Override diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 4703cdc296dec..ae7d9872a1de3 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation; @@ -150,12 +151,28 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + Path blob = new Path(path, blobName); + // we pass CREATE, which means it fails if a blob already exists. + final EnumSet flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) + : EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK); + store.execute((Operation) fileContext -> { + try { + writeToPath(bytes, blob, fileContext, flags); + } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) { + throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage()); + } + return null; + }); + } + + @Override + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final String tempBlob = FsBlobContainer.tempBlobName(blobName); final Path tempBlobPath = new Path(path, tempBlob); final Path blob = new Path(path, blobName); store.execute((Operation) fileContext -> { - writeToPath(inputStream, blobSize, fileContext, tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)); + writeToPath(bytes, tempBlobPath, fileContext, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)); try { fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE); } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) { @@ -165,6 +182,13 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS }); } + private void writeToPath(BytesReference bytes, Path blobPath, FileContext fileContext, + EnumSet createFlags) throws IOException { + try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags)) { + bytes.writeTo(stream); + } + } + private void writeToPath(InputStream inputStream, long blobSize, FileContext fileContext, Path blobPath, EnumSet createFlags) throws IOException { final byte[] buffer = new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]; diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 71c9fe0fd346b..947971edccb7b 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -54,7 +54,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -158,12 +157,9 @@ public void testEnforcedCooldownPeriod() throws IOException { SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), SnapshotsService.OLD_SNAPSHOT_FORMAT)); - PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { - try (InputStream stream = serialized.streamInput()) { + PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic( - BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true); - } - }))); + BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), serialized, true)))); final String newSnapshotName = "snapshot-new"; final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index e879563891fb4..2baa26a790694 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -143,8 +144,8 @@ long getLargeBlobThresholdInBytes() { } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, bytes, failIfAlreadyExists); } @Override diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java index dd629f29313f1..4bacf7329c73a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java @@ -22,12 +22,12 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; -import java.io.ByteArrayInputStream; import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows; @@ -85,7 +85,7 @@ private String startBlockedCleanup(String repoName) throws Exception { logger.info("--> creating a garbage data blob"); final PlainActionFuture garbageFuture = PlainActionFuture.newFuture(); repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore() - .blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true))); + .blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new BytesArray(new byte[1]), true))); garbageFuture.get(); blockMasterFromFinalizingSnapshotOnIndexFile(repoName); @@ -120,7 +120,7 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti final int generation = i; repository.threadPool().generic().execute(ActionRunnable.run(createOldIndexNFuture, () -> repository.blobStore() .blobContainer(repository.basePath()).writeBlob(BlobStoreRepository.INDEX_FILE_PREFIX + generation, - new ByteArrayInputStream(new byte[1]), 1, true))); + new BytesArray(new byte[1]), true))); createOldIndexNFuture.get(); } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 6bf35e94ecd15..fe4048e1cd205 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -19,6 +19,8 @@ package org.elasticsearch.common.blobstore; +import org.elasticsearch.common.bytes.BytesReference; + import java.io.IOException; import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; @@ -110,25 +112,35 @@ default long readBlobPreferredLength() { void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; /** - * Reads blob content from the input stream and writes it to the container in a new blob with the given name, - * using an atomic write operation if the implementation supports it. + * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name. * - * This method assumes the container does not already contain a blob of the same blobName. If a blob by the - * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param bytes + * The bytes to write + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + default void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); + } + + /** + * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name, + * using an atomic write operation if the implementation supports it. * * @param blobName * The name of the blob to write the contents of the input stream to. - * @param inputStream - * The input stream from which to retrieve the bytes to write to the blob. - * @param blobSize - * The size of the blob to be written, in bytes. It is implementation dependent whether - * this value is used in writing the blob to the repository. + * @param bytes + * The bytes to write * @param failIfAlreadyExists * whether to throw a FileAlreadyExistsException if the given blob already exists * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws IOException if the input stream could not be read, or the target blob could not be written to. */ - void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException; /** * Deletes this container and all its contents from the repository. diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index e7098a50a9a3b..86658a9799ae2 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.core.internal.io.IOUtils; @@ -190,12 +191,28 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists) + public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + final Path file = path.resolve(blobName); + try { + writeToPath(bytes, file); + } catch (FileAlreadyExistsException faee) { + if (failIfAlreadyExists) { + throw faee; + } + deleteBlobsIgnoringIfNotExists(Collections.singletonList(blobName)); + writeToPath(bytes, file); + } + IOUtils.fsync(path, true); + } + + @Override + public void writeBlobAtomic(final String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final String tempBlob = tempBlobName(blobName); final Path tempBlobPath = path.resolve(tempBlob); try { - writeToPath(inputStream, tempBlobPath, blobSize); + writeToPath(bytes, tempBlobPath); + IOUtils.fsync(tempBlobPath, false); moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists); } catch (IOException ex) { try { @@ -209,6 +226,12 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream } } + private void writeToPath(BytesReference bytes, Path tempBlobPath) throws IOException { + try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { + bytes.writeTo(outputStream); + } + } + private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException { try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { final int bufferSize = blobStore.bufferSizeInBytes(); diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java index 0ca5aa5a5e444..1d9d200251ddc 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; +import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; import java.io.InputStream; @@ -72,8 +73,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - delegate.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + delegate.writeBlobAtomic(blobName, bytes, failIfAlreadyExists); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 31c3325eaf20c..5ec2532712045 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1273,9 +1273,7 @@ public String startVerification() { byte[] testBytes = Strings.toUTF8Bytes(seed); BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed))); BytesArray bytes = new BytesArray(testBytes); - try (InputStream stream = bytes.streamInput()) { - testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true); - } + testContainer.writeBlobAtomic("master.dat", new BytesArray(testBytes), true); return seed; } } catch (Exception exp) { @@ -1880,11 +1878,9 @@ private long latestGeneration(Collection rootBlobs) { private void writeAtomic(BlobContainer container, final String blobName, final BytesReference bytesRef, boolean failIfAlreadyExists) throws IOException { - try (InputStream stream = bytesRef.streamInput()) { - logger.trace(() -> - new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path())); - container.writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists); - } + logger.trace(() -> + new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path())); + container.writeBlobAtomic(blobName, bytesRef, failIfAlreadyExists); } @Override @@ -2291,10 +2287,7 @@ public void verify(String seed, DiscoveryNode localNode) { } else { BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed))); try { - BytesArray bytes = new BytesArray(seed); - try (InputStream stream = bytes.streamInput()) { - testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length(), true); - } + testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed), true); } catch (Exception exp) { throw new RepositoryVerificationException(metadata.name(), "store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]", exp); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 6037e73750933..38744c91adf11 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -150,8 +150,7 @@ public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistr */ public void write(T obj, BlobContainer blobContainer, String name, boolean compress, BigArrays bigArrays) throws IOException { final String blobName = blobName(name); - serialize(obj, blobName, compress, bigArrays, bytes -> blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), - false)); + serialize(obj, blobName, compress, bigArrays, bytes -> blobContainer.writeBlob(blobName, bytes, false)); } public void serialize(final T obj, final String blobName, final boolean compress, BigArrays bigArrays, diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index fd8bc3d198eb8..e40db3b069e30 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentFragment; @@ -166,10 +165,7 @@ protected void randomCorruption(BlobContainer blobContainer, String blobName) th int location = randomIntBetween(0, buffer.length - 1); buffer[location] = (byte) (buffer[location] ^ 42); } while (originalChecksum == checksum(buffer)); - BytesArray bytesArray = new BytesArray(buffer); - try (StreamInput stream = bytesArray.streamInput()) { - blobContainer.writeBlob(blobName, stream, bytesArray.length(), false); - } + blobContainer.writeBlob(blobName, new BytesArray(buffer), false); } private long checksum(byte[] buffer) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index b26af7a7c3d9d..fa5082319b4b0 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.MockBigArrays; @@ -378,9 +379,9 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, - final boolean failIfAlreadyExists) throws IOException { - writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + public void writeBlobAtomic(final String blobName, final BytesReference bytes, + final boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, bytes, failIfAlreadyExists); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 79684646bbd2f..5fdd42a674879 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -204,14 +204,13 @@ public void testDeleteBlobs() throws IOException { } public static void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray, - boolean failIfAlreadyExists) throws IOException { - try (InputStream stream = bytesArray.streamInput()) { - if (randomBoolean()) { + boolean failIfAlreadyExists) throws IOException { + if (randomBoolean()) { + try (InputStream stream = bytesArray.streamInput()) { container.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists); - } else { - container.writeBlobAtomic(blobName, stream, bytesArray.length(), failIfAlreadyExists); } } + container.writeBlobAtomic(blobName, bytesArray, failIfAlreadyExists); } public void testContainerCreationAndDeletion() throws IOException { @@ -257,9 +256,7 @@ public static byte[] randomBytes(int length) { } protected static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException { - try (InputStream stream = bytesArray.streamInput()) { - container.writeBlob(blobName, stream, bytesArray.length(), true); - } + container.writeBlob(blobName, bytesArray, true); } protected BlobStore newBlobStore() { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index c9516ea59fe7f..9440fe58b8e2e 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.blobstore.support.FilterBlobContainer; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -477,7 +478,7 @@ && path().equals(basePath()) == false) { } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, + public void writeBlobAtomic(final String blobName, final BytesReference bytes, final boolean failIfAlreadyExists) throws IOException { final Random random = RandomizedContext.current().getRandom(); if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { @@ -493,7 +494,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) { // Simulate a failure between the write and move operation in FsBlobContainer final String tempBlobName = FsBlobContainer.tempBlobName(blobName); - super.writeBlob(tempBlobName, inputStream, blobSize, failIfAlreadyExists); + super.writeBlob(tempBlobName, bytes, failIfAlreadyExists); maybeIOExceptionOrBlock(blobName); final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate(); fsBlobContainer.moveBlobAtomic(tempBlobName, blobName, failIfAlreadyExists); @@ -501,7 +502,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream // Atomic write since it is potentially supported // by the delegating blob container maybeIOExceptionOrBlock(blobName); - super.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); + super.writeBlobAtomic(blobName, bytes, failIfAlreadyExists); } } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index f278f58c939b4..0bf331a9a482f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -267,7 +267,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) { throw unsupportedException(); } From 49e8a1d6350e71cc65f7132cd914c974de8ade66 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 17 Dec 2020 16:26:42 +0100 Subject: [PATCH 2/2] fixed random oversights --- .../elasticsearch/common/blobstore/fs/FsBlobContainer.java | 5 ++--- .../blobstore/ESBlobStoreRepositoryIntegTestCase.java | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 86658a9799ae2..7cbbd226c7176 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -206,13 +206,11 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea } @Override - public void writeBlobAtomic(final String blobName, BytesReference bytes, boolean failIfAlreadyExists) - throws IOException { + public void writeBlobAtomic(final String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final String tempBlob = tempBlobName(blobName); final Path tempBlobPath = path.resolve(tempBlob); try { writeToPath(bytes, tempBlobPath); - IOUtils.fsync(tempBlobPath, false); moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists); } catch (IOException ex) { try { @@ -230,6 +228,7 @@ private void writeToPath(BytesReference bytes, Path tempBlobPath) throws IOExcep try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { bytes.writeTo(outputStream); } + IOUtils.fsync(tempBlobPath, false); } private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 5fdd42a674879..1c529822504b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -206,11 +206,10 @@ public void testDeleteBlobs() throws IOException { public static void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray, boolean failIfAlreadyExists) throws IOException { if (randomBoolean()) { - try (InputStream stream = bytesArray.streamInput()) { - container.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists); - } + container.writeBlob(blobName, bytesArray, failIfAlreadyExists); + } else { + container.writeBlobAtomic(blobName, bytesArray, failIfAlreadyExists); } - container.writeBlobAtomic(blobName, bytesArray, failIfAlreadyExists); } public void testContainerCreationAndDeletion() throws IOException {