Skip to content

Commit

Permalink
Optimize Away Small Resumable Uploads in GCS Repository (#74813) (#76525
Browse files Browse the repository at this point in the history
)

Similar to ChunkedOutputStream we can optimize away small resumable uploads
to speed up small meta writes (and improve stability with JDK-8 in 7.x).
  • Loading branch information
original-brownbear authored Aug 14, 2021
1 parent 686557e commit 6e9985f
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public Map<String, Repository.Factory> getRepositories(Environment env, NamedXCo
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(
metadata.settings().get("bucket"), "test", metadata.name(), storageService,
metadata.settings().get("bucket"), "test", metadata.name(), storageService, bigArrays,
randomIntBetween(1, 8) * 1024) {
@Override
long getLargeBlobThresholdInBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -94,16 +96,19 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private final GoogleCloudStorageService storageService;
private final GoogleCloudStorageOperationsStats stats;
private final int bufferSize;
private final BigArrays bigArrays;

GoogleCloudStorageBlobStore(String bucketName,
String clientName,
String repositoryName,
GoogleCloudStorageService storageService,
BigArrays bigArrays,
int bufferSize) {
this.bucketName = bucketName;
this.clientName = clientName;
this.repositoryName = repositoryName;
this.storageService = storageService;
this.bigArrays = bigArrays;
this.stats = new GoogleCloudStorageOperationsStats(bucketName);
this.bufferSize = bufferSize;
}
Expand Down Expand Up @@ -234,7 +239,12 @@ void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExist
writeBlobResumable(BlobInfo.newBuilder(bucketName, blobName).setMd5(md5).build(), bytes.streamInput(), bytes.length(),
failIfAlreadyExists);
} else {
writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build());
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
if (bytes.hasArray()) {
writeBlobMultipart(blobInfo, bytes.array(), bytes.arrayOffset(), bytes.length(), failIfAlreadyExists);
} else {
writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, blobInfo);
}
}
}

Expand All @@ -252,7 +262,9 @@ private void writeBlob(InputStream inputStream, long blobSize, boolean failIfAlr
if (blobSize > getLargeBlobThresholdInBytes()) {
writeBlobResumable(blobInfo, inputStream, blobSize, failIfAlreadyExists);
} else {
writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists);
final byte[] buffer = new byte[Math.toIntExact(blobSize)];
Streams.readFully(inputStream, buffer);
writeBlobMultipart(blobInfo, buffer, 0, Math.toIntExact(blobSize), failIfAlreadyExists);
}
}

Expand All @@ -275,23 +287,71 @@ void writeBlob(String blobName, boolean failIfAlreadyExists, CheckedConsumer<Out
StorageException storageException = null;

for (int retry = 0; retry < 3; ++retry) {
try {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
writer.accept(new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) {
// we start out by buffering the write to a buffer, if it exceeds the large blob threshold we start a resumable upload, flush
// the buffer to it and keep writing to the resumable upload. If we never exceed the large blob threshold we just write the
// buffer via a standard blob write
try (ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(bigArrays)) {
final AtomicReference<WriteChannel> channelRef = new AtomicReference<>();
writer.accept(new OutputStream() {

private OutputStream resumableStream;

@Override
public void write(int b) throws IOException {
if (resumableStream != null) {
resumableStream.write(b);
} else {
if (buffer.size() + 1 > getLargeBlobThresholdInBytes()) {
initResumableStream();
resumableStream.write(b);
} else {
buffer.write(b);
}
}
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
int written = 0;
while (written < len) {
// at most write the default chunk size in one go to prevent allocating huge buffers in the SDK
// see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
final int toWrite = Math.min(len - written, 60 * 256 * 1024);
out.write(b, off + written, toWrite);
written += toWrite;
if (resumableStream != null) {
resumableStream.write(b, off, len);
} else {
if (buffer.size() + len > getLargeBlobThresholdInBytes()) {
initResumableStream();
resumableStream.write(b, off, len);
} else {
buffer.write(b, off, len);
}
}
}

private void initResumableStream() throws IOException {
final WriteChannel writeChannel =
SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
channelRef.set(writeChannel);
resumableStream = new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) {
@Override
public void write(byte[] b, int off, int len) throws IOException {
int written = 0;
while (written < len) {
// at most write the default chunk size in one go to prevent allocating huge buffers in the SDK
// see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
final int toWrite = Math.min(len - written, 60 * 256 * 1024);
out.write(b, off + written, toWrite);
written += toWrite;
}
}
};
buffer.bytes().writeTo(resumableStream);
buffer.close();
}
});
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
stats.trackPutOperation();
final WritableByteChannel writeChannel = channelRef.get();
if (writeChannel != null) {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
stats.trackPutOperation();
} else {
writeBlob(blobName, buffer.bytes(), failIfAlreadyExists);
}
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
Expand Down Expand Up @@ -378,22 +438,21 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long
* 'multipart/related' request containing both data and metadata. The request is
* gziped), see:
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload
* @param blobInfo the info for the blob to be uploaded
* @param inputStream the stream containing the blob data
* @param blobInfo the info for the blob to be uploaded
* @param buffer the byte array containing the data
* @param offset offset at which the blob contents start in the buffer
* @param blobSize the size
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
private void writeBlobMultipart(BlobInfo blobInfo, byte[] buffer, int offset, int blobSize, boolean failIfAlreadyExists)
throws IOException {
assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method";
final byte[] buffer = new byte[Math.toIntExact(blobSize)];
Streams.readFully(inputStream, buffer);
try {
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :
new Storage.BlobTargetOption[0];
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(blobInfo, buffer, targetOptions));
() -> client().create(blobInfo, buffer, offset, blobSize, targetOptions));
// We don't track this operation on the http layer as
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private static Map<String, String> buildLocation(RepositoryMetadata metadata) {

@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bufferSize);
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bigArrays, bufferSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import fixture.gcs.FakeOAuth2HttpHandler;
import org.apache.http.HttpStatus;
import org.elasticsearch.jdk.JavaVersion;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.SuppressForbidden;
Expand Down Expand Up @@ -154,7 +155,7 @@ StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clien

httpServer.createContext("/token", new FakeOAuth2HttpHandler());
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, "repo", service,
randomIntBetween(1, 8) * 1024);
BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024);

return new GoogleCloudStorageBlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), blobStore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -77,7 +78,7 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti
when(storageService.client(any(String.class), any(String.class), any(GoogleCloudStorageOperationsStats.class))).thenReturn(storage);

try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", "repo", storageService,
randomIntBetween(1, 8) * 1024)) {
BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024)) {
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);

IOException e = expectThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(blobs.iterator()));
Expand Down

0 comments on commit 6e9985f

Please sign in to comment.