Skip to content

Commit

Permalink
Optimize Away Small Resumable Uploads in GCS Repository (#74813)
Browse files Browse the repository at this point in the history
Similar to ChunkedOutputStream we can optimize away small resumable uploads
to speed up small meta writes (and improve stability with JDK-8 in 7.x).
  • Loading branch information
original-brownbear authored Jul 1, 2021
1 parent 1ea7e50 commit 85f0a02
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public Map<String, Repository.Factory> getRepositories(Environment env, NamedXCo
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(
metadata.settings().get("bucket"), "test", metadata.name(), storageService,
metadata.settings().get("bucket"), "test", metadata.name(), storageService, bigArrays,
randomIntBetween(1, 8) * 1024) {
@Override
long getLargeBlobThresholdInBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -94,16 +96,19 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private final GoogleCloudStorageService storageService;
private final GoogleCloudStorageOperationsStats stats;
private final int bufferSize;
private final BigArrays bigArrays;

GoogleCloudStorageBlobStore(String bucketName,
String clientName,
String repositoryName,
GoogleCloudStorageService storageService,
BigArrays bigArrays,
int bufferSize) {
this.bucketName = bucketName;
this.clientName = clientName;
this.repositoryName = repositoryName;
this.storageService = storageService;
this.bigArrays = bigArrays;
this.stats = new GoogleCloudStorageOperationsStats(bucketName);
this.bufferSize = bufferSize;
}
Expand Down Expand Up @@ -234,7 +239,12 @@ void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExist
writeBlobResumable(BlobInfo.newBuilder(bucketName, blobName).setMd5(md5).build(), bytes.streamInput(), bytes.length(),
failIfAlreadyExists);
} else {
writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build());
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
if (bytes.hasArray()) {
writeBlobMultipart(blobInfo, bytes.array(), bytes.arrayOffset(), bytes.length(), failIfAlreadyExists);
} else {
writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, blobInfo);
}
}
}

Expand All @@ -252,7 +262,9 @@ private void writeBlob(InputStream inputStream, long blobSize, boolean failIfAlr
if (blobSize > getLargeBlobThresholdInBytes()) {
writeBlobResumable(blobInfo, inputStream, blobSize, failIfAlreadyExists);
} else {
writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists);
final byte[] buffer = new byte[Math.toIntExact(blobSize)];
Streams.readFully(inputStream, buffer);
writeBlobMultipart(blobInfo, buffer, 0, Math.toIntExact(blobSize), failIfAlreadyExists);
}
}

Expand All @@ -275,23 +287,71 @@ void writeBlob(String blobName, boolean failIfAlreadyExists, CheckedConsumer<Out
StorageException storageException = null;

for (int retry = 0; retry < 3; ++retry) {
try {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
writer.accept(new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) {
// we start out by buffering the write to a buffer, if it exceeds the large blob threshold we start a resumable upload, flush
// the buffer to it and keep writing to the resumable upload. If we never exceed the large blob threshold we just write the
// buffer via a standard blob write
try (ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(bigArrays)) {
final AtomicReference<WriteChannel> channelRef = new AtomicReference<>();
writer.accept(new OutputStream() {

private OutputStream resumableStream;

@Override
public void write(int b) throws IOException {
if (resumableStream != null) {
resumableStream.write(b);
} else {
if (buffer.size() + 1 > getLargeBlobThresholdInBytes()) {
initResumableStream();
resumableStream.write(b);
} else {
buffer.write(b);
}
}
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
int written = 0;
while (written < len) {
// at most write the default chunk size in one go to prevent allocating huge buffers in the SDK
// see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
final int toWrite = Math.min(len - written, 60 * 256 * 1024);
out.write(b, off + written, toWrite);
written += toWrite;
if (resumableStream != null) {
resumableStream.write(b, off, len);
} else {
if (buffer.size() + len > getLargeBlobThresholdInBytes()) {
initResumableStream();
resumableStream.write(b, off, len);
} else {
buffer.write(b, off, len);
}
}
}

private void initResumableStream() throws IOException {
final WriteChannel writeChannel =
SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
channelRef.set(writeChannel);
resumableStream = new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) {
@Override
public void write(byte[] b, int off, int len) throws IOException {
int written = 0;
while (written < len) {
// at most write the default chunk size in one go to prevent allocating huge buffers in the SDK
// see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
final int toWrite = Math.min(len - written, 60 * 256 * 1024);
out.write(b, off + written, toWrite);
written += toWrite;
}
}
};
buffer.bytes().writeTo(resumableStream);
buffer.close();
}
});
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
stats.trackPutOperation();
final WritableByteChannel writeChannel = channelRef.get();
if (writeChannel != null) {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
stats.trackPutOperation();
} else {
writeBlob(blobName, buffer.bytes(), failIfAlreadyExists);
}
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
Expand Down Expand Up @@ -378,22 +438,21 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long
* 'multipart/related' request containing both data and metadata. The request is
* gziped), see:
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload
* @param blobInfo the info for the blob to be uploaded
* @param inputStream the stream containing the blob data
* @param blobInfo the info for the blob to be uploaded
* @param buffer the byte array containing the data
* @param offset offset at which the blob contents start in the buffer
* @param blobSize the size
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
private void writeBlobMultipart(BlobInfo blobInfo, byte[] buffer, int offset, int blobSize, boolean failIfAlreadyExists)
throws IOException {
assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method";
final byte[] buffer = new byte[Math.toIntExact(blobSize)];
Streams.readFully(inputStream, buffer);
try {
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :
new Storage.BlobTargetOption[0];
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(blobInfo, buffer, targetOptions));
() -> client().create(blobInfo, buffer, offset, blobSize, targetOptions));
// We don't track this operation on the http layer as
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private static Map<String, String> buildLocation(RepositoryMetadata metadata) {

@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bufferSize);
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bigArrays, bufferSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.sun.net.httpserver.HttpHandler;
import fixture.gcs.FakeOAuth2HttpHandler;
import org.apache.http.HttpStatus;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.SuppressForbidden;
Expand Down Expand Up @@ -140,7 +141,7 @@ StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clien

httpServer.createContext("/token", new FakeOAuth2HttpHandler());
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, "repo", service,
randomIntBetween(1, 8) * 1024);
BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024);

return new GoogleCloudStorageBlobContainer(BlobPath.EMPTY, blobStore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -77,7 +78,7 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti
when(storageService.client(any(String.class), any(String.class), any(GoogleCloudStorageOperationsStats.class))).thenReturn(storage);

try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", "repo", storageService,
randomIntBetween(1, 8) * 1024)) {
BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024)) {
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);

IOException e = expectThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(blobs.iterator()));
Expand Down

0 comments on commit 85f0a02

Please sign in to comment.