diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index d7db3960e67ad..abe66e6cc7cf8 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.bytes.BytesReference; import java.io.BufferedInputStream; import java.io.FileNotFoundException; @@ -132,7 +133,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { throw new UnsupportedOperationException("URL repository doesn't support this operation"); } diff --git a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 340123a9108af..fec82f8598de3 100644 --- a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; @@ -36,9 +37,7 @@ import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Collection; @@ -221,10 +220,8 @@ public void testLargeBlobCountDeletion() throws Exception { final BlobContainer container = store.blobContainer(new BlobPath()); for (int i = 0; i < numberOfBlobs; i++) { byte[] bytes = randomBytes(randomInt(100)); - try (InputStream inputStream = new ByteArrayInputStream(bytes)) { - String blobName = randomAlphaOfLength(10); - container.writeBlob(blobName, inputStream, bytes.length, false); - } + String blobName = randomAlphaOfLength(10); + container.writeBlob(blobName, new BytesArray(bytes), false); } container.delete(); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 734ad8bc8c205..870df48c35edb 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; import java.io.InputStream; @@ -101,8 +102,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, bytes, failIfAlreadyExists); } @Override diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index dd225d2c1ccba..452dbda55648b 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; import java.io.InputStream; @@ -83,8 +84,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, bytes, failIfAlreadyExists); } @Override diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 4703cdc296dec..ae7d9872a1de3 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation; @@ -150,12 +151,28 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + Path blob = new Path(path, blobName); + // we pass CREATE, which means it fails if a blob already exists. + final EnumSet flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) + : EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK); + store.execute((Operation) fileContext -> { + try { + writeToPath(bytes, blob, fileContext, flags); + } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) { + throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage()); + } + return null; + }); + } + + @Override + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final String tempBlob = FsBlobContainer.tempBlobName(blobName); final Path tempBlobPath = new Path(path, tempBlob); final Path blob = new Path(path, blobName); store.execute((Operation) fileContext -> { - writeToPath(inputStream, blobSize, fileContext, tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)); + writeToPath(bytes, tempBlobPath, fileContext, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)); try { fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE); } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) { @@ -165,6 +182,13 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS }); } + private void writeToPath(BytesReference bytes, Path blobPath, FileContext fileContext, + EnumSet createFlags) throws IOException { + try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags)) { + bytes.writeTo(stream); + } + } + private void writeToPath(InputStream inputStream, long blobSize, FileContext fileContext, Path blobPath, EnumSet createFlags) throws IOException { final byte[] buffer = new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]; diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 71c9fe0fd346b..947971edccb7b 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -54,7 +54,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -158,12 +157,9 @@ public void testEnforcedCooldownPeriod() throws IOException { SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), SnapshotsService.OLD_SNAPSHOT_FORMAT)); - PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { - try (InputStream stream = serialized.streamInput()) { + PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic( - BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true); - } - }))); + BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), serialized, true)))); final String newSnapshotName = "snapshot-new"; final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index e879563891fb4..2baa26a790694 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -143,8 +144,8 @@ long getLargeBlobThresholdInBytes() { } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, bytes, failIfAlreadyExists); } @Override diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java index dd629f29313f1..4bacf7329c73a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java @@ -22,12 +22,12 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; -import java.io.ByteArrayInputStream; import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows; @@ -85,7 +85,7 @@ private String startBlockedCleanup(String repoName) throws Exception { logger.info("--> creating a garbage data blob"); final PlainActionFuture garbageFuture = PlainActionFuture.newFuture(); repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore() - .blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true))); + .blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new BytesArray(new byte[1]), true))); garbageFuture.get(); blockMasterFromFinalizingSnapshotOnIndexFile(repoName); @@ -120,7 +120,7 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti final int generation = i; repository.threadPool().generic().execute(ActionRunnable.run(createOldIndexNFuture, () -> repository.blobStore() .blobContainer(repository.basePath()).writeBlob(BlobStoreRepository.INDEX_FILE_PREFIX + generation, - new ByteArrayInputStream(new byte[1]), 1, true))); + new BytesArray(new byte[1]), true))); createOldIndexNFuture.get(); } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 6bf35e94ecd15..fe4048e1cd205 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -19,6 +19,8 @@ package org.elasticsearch.common.blobstore; +import org.elasticsearch.common.bytes.BytesReference; + import java.io.IOException; import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; @@ -110,25 +112,35 @@ default long readBlobPreferredLength() { void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; /** - * Reads blob content from the input stream and writes it to the container in a new blob with the given name, - * using an atomic write operation if the implementation supports it. + * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name. * - * This method assumes the container does not already contain a blob of the same blobName. If a blob by the - * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param bytes + * The bytes to write + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + default void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); + } + + /** + * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name, + * using an atomic write operation if the implementation supports it. * * @param blobName * The name of the blob to write the contents of the input stream to. - * @param inputStream - * The input stream from which to retrieve the bytes to write to the blob. - * @param blobSize - * The size of the blob to be written, in bytes. It is implementation dependent whether - * this value is used in writing the blob to the repository. + * @param bytes + * The bytes to write * @param failIfAlreadyExists * whether to throw a FileAlreadyExistsException if the given blob already exists * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws IOException if the input stream could not be read, or the target blob could not be written to. */ - void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException; /** * Deletes this container and all its contents from the repository. diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index e7098a50a9a3b..7cbbd226c7176 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.core.internal.io.IOUtils; @@ -190,12 +191,26 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists) - throws IOException { + public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + final Path file = path.resolve(blobName); + try { + writeToPath(bytes, file); + } catch (FileAlreadyExistsException faee) { + if (failIfAlreadyExists) { + throw faee; + } + deleteBlobsIgnoringIfNotExists(Collections.singletonList(blobName)); + writeToPath(bytes, file); + } + IOUtils.fsync(path, true); + } + + @Override + public void writeBlobAtomic(final String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final String tempBlob = tempBlobName(blobName); final Path tempBlobPath = path.resolve(tempBlob); try { - writeToPath(inputStream, tempBlobPath, blobSize); + writeToPath(bytes, tempBlobPath); moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists); } catch (IOException ex) { try { @@ -209,6 +224,13 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream } } + private void writeToPath(BytesReference bytes, Path tempBlobPath) throws IOException { + try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { + bytes.writeTo(outputStream); + } + IOUtils.fsync(tempBlobPath, false); + } + private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException { try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { final int bufferSize = blobStore.bufferSizeInBytes(); diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java index 0ca5aa5a5e444..1d9d200251ddc 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; +import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; import java.io.InputStream; @@ -72,8 +73,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - delegate.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { + delegate.writeBlobAtomic(blobName, bytes, failIfAlreadyExists); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 31c3325eaf20c..5ec2532712045 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1273,9 +1273,7 @@ public String startVerification() { byte[] testBytes = Strings.toUTF8Bytes(seed); BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed))); BytesArray bytes = new BytesArray(testBytes); - try (InputStream stream = bytes.streamInput()) { - testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true); - } + testContainer.writeBlobAtomic("master.dat", new BytesArray(testBytes), true); return seed; } } catch (Exception exp) { @@ -1880,11 +1878,9 @@ private long latestGeneration(Collection rootBlobs) { private void writeAtomic(BlobContainer container, final String blobName, final BytesReference bytesRef, boolean failIfAlreadyExists) throws IOException { - try (InputStream stream = bytesRef.streamInput()) { - logger.trace(() -> - new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path())); - container.writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists); - } + logger.trace(() -> + new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path())); + container.writeBlobAtomic(blobName, bytesRef, failIfAlreadyExists); } @Override @@ -2291,10 +2287,7 @@ public void verify(String seed, DiscoveryNode localNode) { } else { BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed))); try { - BytesArray bytes = new BytesArray(seed); - try (InputStream stream = bytes.streamInput()) { - testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length(), true); - } + testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed), true); } catch (Exception exp) { throw new RepositoryVerificationException(metadata.name(), "store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]", exp); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 6037e73750933..38744c91adf11 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -150,8 +150,7 @@ public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistr */ public void write(T obj, BlobContainer blobContainer, String name, boolean compress, BigArrays bigArrays) throws IOException { final String blobName = blobName(name); - serialize(obj, blobName, compress, bigArrays, bytes -> blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), - false)); + serialize(obj, blobName, compress, bigArrays, bytes -> blobContainer.writeBlob(blobName, bytes, false)); } public void serialize(final T obj, final String blobName, final boolean compress, BigArrays bigArrays, diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index fd8bc3d198eb8..e40db3b069e30 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentFragment; @@ -166,10 +165,7 @@ protected void randomCorruption(BlobContainer blobContainer, String blobName) th int location = randomIntBetween(0, buffer.length - 1); buffer[location] = (byte) (buffer[location] ^ 42); } while (originalChecksum == checksum(buffer)); - BytesArray bytesArray = new BytesArray(buffer); - try (StreamInput stream = bytesArray.streamInput()) { - blobContainer.writeBlob(blobName, stream, bytesArray.length(), false); - } + blobContainer.writeBlob(blobName, new BytesArray(buffer), false); } private long checksum(byte[] buffer) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index b26af7a7c3d9d..fa5082319b4b0 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.MockBigArrays; @@ -378,9 +379,9 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, - final boolean failIfAlreadyExists) throws IOException { - writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + public void writeBlobAtomic(final String blobName, final BytesReference bytes, + final boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, bytes, failIfAlreadyExists); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 79684646bbd2f..1c529822504b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -204,13 +204,11 @@ public void testDeleteBlobs() throws IOException { } public static void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray, - boolean failIfAlreadyExists) throws IOException { - try (InputStream stream = bytesArray.streamInput()) { - if (randomBoolean()) { - container.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists); - } else { - container.writeBlobAtomic(blobName, stream, bytesArray.length(), failIfAlreadyExists); - } + boolean failIfAlreadyExists) throws IOException { + if (randomBoolean()) { + container.writeBlob(blobName, bytesArray, failIfAlreadyExists); + } else { + container.writeBlobAtomic(blobName, bytesArray, failIfAlreadyExists); } } @@ -257,9 +255,7 @@ public static byte[] randomBytes(int length) { } protected static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException { - try (InputStream stream = bytesArray.streamInput()) { - container.writeBlob(blobName, stream, bytesArray.length(), true); - } + container.writeBlob(blobName, bytesArray, true); } protected BlobStore newBlobStore() { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index c9516ea59fe7f..9440fe58b8e2e 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.blobstore.support.FilterBlobContainer; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -477,7 +478,7 @@ && path().equals(basePath()) == false) { } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, + public void writeBlobAtomic(final String blobName, final BytesReference bytes, final boolean failIfAlreadyExists) throws IOException { final Random random = RandomizedContext.current().getRandom(); if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { @@ -493,7 +494,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) { // Simulate a failure between the write and move operation in FsBlobContainer final String tempBlobName = FsBlobContainer.tempBlobName(blobName); - super.writeBlob(tempBlobName, inputStream, blobSize, failIfAlreadyExists); + super.writeBlob(tempBlobName, bytes, failIfAlreadyExists); maybeIOExceptionOrBlock(blobName); final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate(); fsBlobContainer.moveBlobAtomic(tempBlobName, blobName, failIfAlreadyExists); @@ -501,7 +502,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream // Atomic write since it is potentially supported // by the delegating blob container maybeIOExceptionOrBlock(blobName); - super.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); + super.writeBlobAtomic(blobName, bytes, failIfAlreadyExists); } } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index f278f58c939b4..0bf331a9a482f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -267,7 +267,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b } @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { + public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) { throw unsupportedException(); }