From 6e9985f49a78de60cd5d45ee572fcae0ae4f1f35 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 14 Aug 2021 17:07:40 +0200 Subject: [PATCH] Optimize Away Small Resumable Uploads in GCS Repository (#74813) (#76525) Similar to ChunkedOutputStream we can optimize away small resumable uploads to speed up small meta writes (and improve stability with JDK-8 in 7.x). --- ...eCloudStorageBlobStoreRepositoryTests.java | 2 +- .../gcs/GoogleCloudStorageBlobStore.java | 99 +++++++++++++++---- .../gcs/GoogleCloudStorageRepository.java | 2 +- ...CloudStorageBlobContainerRetriesTests.java | 3 +- ...leCloudStorageBlobStoreContainerTests.java | 3 +- 5 files changed, 85 insertions(+), 24 deletions(-) diff --git a/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index f4ecd1099e19a..73bc9a49068a2 100644 --- a/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -241,7 +241,7 @@ public Map getRepositories(Environment env, NamedXCo @Override protected GoogleCloudStorageBlobStore createBlobStore() { return new GoogleCloudStorageBlobStore( - metadata.settings().get("bucket"), "test", metadata.name(), storageService, + metadata.settings().get("bucket"), "test", metadata.name(), storageService, bigArrays, randomIntBetween(1, 8) * 1024) { @Override long getLargeBlobThresholdInBytes() { diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 44423806e4300..6f14dcad042e8 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -22,6 +22,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; @@ -94,16 +96,19 @@ class GoogleCloudStorageBlobStore implements BlobStore { private final GoogleCloudStorageService storageService; private final GoogleCloudStorageOperationsStats stats; private final int bufferSize; + private final BigArrays bigArrays; GoogleCloudStorageBlobStore(String bucketName, String clientName, String repositoryName, GoogleCloudStorageService storageService, + BigArrays bigArrays, int bufferSize) { this.bucketName = bucketName; this.clientName = clientName; this.repositoryName = repositoryName; this.storageService = storageService; + this.bigArrays = bigArrays; this.stats = new GoogleCloudStorageOperationsStats(bucketName); this.bufferSize = bufferSize; } @@ -234,7 +239,12 @@ void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExist writeBlobResumable(BlobInfo.newBuilder(bucketName, blobName).setMd5(md5).build(), bytes.streamInput(), bytes.length(), failIfAlreadyExists); } else { - writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build()); + final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build(); + if (bytes.hasArray()) { + writeBlobMultipart(blobInfo, bytes.array(), bytes.arrayOffset(), bytes.length(), failIfAlreadyExists); + } else { + writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, blobInfo); + } } } @@ -252,7 +262,9 @@ private void writeBlob(InputStream inputStream, long blobSize, boolean failIfAlr if (blobSize > getLargeBlobThresholdInBytes()) { writeBlobResumable(blobInfo, inputStream, blobSize, failIfAlreadyExists); } else { - writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists); + final byte[] buffer = new byte[Math.toIntExact(blobSize)]; + Streams.readFully(inputStream, buffer); + writeBlobMultipart(blobInfo, buffer, 0, Math.toIntExact(blobSize), failIfAlreadyExists); } } @@ -275,23 +287,71 @@ void writeBlob(String blobName, boolean failIfAlreadyExists, CheckedConsumer client().writer(blobInfo, writeOptions)); - writer.accept(new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) { + // we start out by buffering the write to a buffer, if it exceeds the large blob threshold we start a resumable upload, flush + // the buffer to it and keep writing to the resumable upload. If we never exceed the large blob threshold we just write the + // buffer via a standard blob write + try (ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(bigArrays)) { + final AtomicReference channelRef = new AtomicReference<>(); + writer.accept(new OutputStream() { + + private OutputStream resumableStream; + + @Override + public void write(int b) throws IOException { + if (resumableStream != null) { + resumableStream.write(b); + } else { + if (buffer.size() + 1 > getLargeBlobThresholdInBytes()) { + initResumableStream(); + resumableStream.write(b); + } else { + buffer.write(b); + } + } + } + @Override public void write(byte[] b, int off, int len) throws IOException { - int written = 0; - while (written < len) { - // at most write the default chunk size in one go to prevent allocating huge buffers in the SDK - // see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE - final int toWrite = Math.min(len - written, 60 * 256 * 1024); - out.write(b, off + written, toWrite); - written += toWrite; + if (resumableStream != null) { + resumableStream.write(b, off, len); + } else { + if (buffer.size() + len > getLargeBlobThresholdInBytes()) { + initResumableStream(); + resumableStream.write(b, off, len); + } else { + buffer.write(b, off, len); + } } } + + private void initResumableStream() throws IOException { + final WriteChannel writeChannel = + SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions)); + channelRef.set(writeChannel); + resumableStream = new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) { + @Override + public void write(byte[] b, int off, int len) throws IOException { + int written = 0; + while (written < len) { + // at most write the default chunk size in one go to prevent allocating huge buffers in the SDK + // see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE + final int toWrite = Math.min(len - written, 60 * 256 * 1024); + out.write(b, off + written, toWrite); + written += toWrite; + } + } + }; + buffer.bytes().writeTo(resumableStream); + buffer.close(); + } }); - SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - stats.trackPutOperation(); + final WritableByteChannel writeChannel = channelRef.get(); + if (writeChannel != null) { + SocketAccess.doPrivilegedVoidIOException(writeChannel::close); + stats.trackPutOperation(); + } else { + writeBlob(blobName, buffer.bytes(), failIfAlreadyExists); + } return; } catch (final StorageException se) { final int errorCode = se.getCode(); @@ -378,22 +438,21 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long * 'multipart/related' request containing both data and metadata. The request is * gziped), see: * https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload - * @param blobInfo the info for the blob to be uploaded - * @param inputStream the stream containing the blob data + * @param blobInfo the info for the blob to be uploaded + * @param buffer the byte array containing the data + * @param offset offset at which the blob contents start in the buffer * @param blobSize the size * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + private void writeBlobMultipart(BlobInfo blobInfo, byte[] buffer, int offset, int blobSize, boolean failIfAlreadyExists) throws IOException { assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method"; - final byte[] buffer = new byte[Math.toIntExact(blobSize)]; - Streams.readFully(inputStream, buffer); try { final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ? new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } : new Storage.BlobTargetOption[0]; SocketAccess.doPrivilegedVoidIOException( - () -> client().create(blobInfo, buffer, targetOptions)); + () -> client().create(blobInfo, buffer, offset, blobSize, targetOptions)); // We don't track this operation on the http layer as // we do with the GET/LIST operations since this operations // can trigger multiple underlying http requests but only one diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index a6460019d8904..d5be2ae35b4d6 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -96,7 +96,7 @@ private static Map buildLocation(RepositoryMetadata metadata) { @Override protected GoogleCloudStorageBlobStore createBlobStore() { - return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bufferSize); + return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bigArrays, bufferSize); } @Override diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index bec41d2f332fb..319d8ac630ea4 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -15,6 +15,7 @@ import fixture.gcs.FakeOAuth2HttpHandler; import org.apache.http.HttpStatus; import org.elasticsearch.jdk.JavaVersion; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.core.SuppressForbidden; @@ -154,7 +155,7 @@ StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clien httpServer.createContext("/token", new FakeOAuth2HttpHandler()); final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, "repo", service, - randomIntBetween(1, 8) * 1024); + BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024); return new GoogleCloudStorageBlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), blobStore); } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java index a1d99cd266302..a4bb5973ff8b9 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -77,7 +78,7 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti when(storageService.client(any(String.class), any(String.class), any(GoogleCloudStorageOperationsStats.class))).thenReturn(storage); try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", "repo", storageService, - randomIntBetween(1, 8) * 1024)) { + BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024)) { final BlobContainer container = store.blobContainer(BlobPath.EMPTY); IOException e = expectThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(blobs.iterator()));