Skip to content

Commit

Permalink
feat: port ParallelCompositeUploadBlobWriteSessionConfig to work with…
Browse files Browse the repository at this point in the history
… HttpStorageOptions (#2474)
  • Loading branch information
BenWhitehead authored Mar 28, 2024
1 parent d84e255 commit 3bf6026
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
* Break the stream of bytes into smaller part objects uploading each part in parallel. Then
* composing the parts together to make the ultimate object.
* </td>
* <td>gRPC</td>
* <td>gRPC, HTTP</td>
* <td>
* <ol>
* <li>
Expand Down Expand Up @@ -342,7 +342,7 @@ public static JournalingBlobWriteSessionConfig journaling(Collection<Path> paths
* @since 2.28.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public static ParallelCompositeUploadBlobWriteSessionConfig parallelCompositeUpload() {
return ParallelCompositeUploadBlobWriteSessionConfig.withDefaults();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@
*/
@Immutable
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible {

private static final int MAX_PARTS_PER_COMPOSE = 32;
private final int maxPartsPerCompose;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@ private BlobInfo definePart(BlobInfo ultimateObject, PartRange partRange, long o
PART_INDEX.appendTo(partRange, builder);
OBJECT_OFFSET.appendTo(offset, builder);
b.setMetadata(builder.build());
// the value of a kms key name will contain the exact version when read from gcs
// however, gcs will not accept that version resource identifier when creating a new object
// strip it out, so it can be included as a query string parameter instead
b.setKmsKeyName(null);
b = partMetadataFieldDecorator.apply(b);
return b.build();
}
Expand Down Expand Up @@ -507,7 +511,11 @@ private ApiFuture<Boolean> deleteAsync(BlobId id) {
@VisibleForTesting
@NonNull
static Opts<ObjectTargetOpt> getPartOpts(Opts<ObjectTargetOpt> opts) {
return opts.filter(TO_EXCLUDE_FROM_PARTS).prepend(DOES_NOT_EXIST);
return opts.filter(TO_EXCLUDE_FROM_PARTS)
.prepend(DOES_NOT_EXIST)
// disable gzip transfer encoding for HTTP, it causes a significant bottleneck uploading
// the parts
.prepend(Opts.from(UnifiedOpts.disableGzipContent()));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;

/**
* Facade which makes an instance of {@link RewindableContent} appear as an input stream.
*
* <p>It does this by calling {@link RewindableContent#writeTo(GatheringByteChannel)} on an
* anonymous channel which closes over the read destination.
*/
final class RewindableContentInputStream extends InputStream {

private final RewindableContent content;

RewindableContentInputStream(RewindableContent content) {
this.content = content;
}

@Override
public int read() throws IOException {
byte[] tmp = new byte[1];
int read = read(tmp);
if (read == -1) {
return -1;
} else {
return tmp[0] & 0xFF;
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
// define a byte buffer as the destination for our write
ByteBuffer dst = ByteBuffer.wrap(b, off, len);
int remaining = dst.remaining();
if (remaining == 0) {
return 0;
}
long written =
content.writeTo(
new AnonWritableByteChannel() {
@Override
public long write(ByteBuffer[] srcs, int offset, int length) {
// srcs here is the bytes of content
long total = 0;
for (int i = offset; i < length; i++) {
ByteBuffer src = srcs[i];
// copy what we can from our src to the dst buffer
long written = Buffers.copy(src, dst);
total += written;
}
return total;
}
});
// if the dst has space, but we didn't write anything means we didn't have anything to write
if (written == 0) {
return -1;
}
return Math.toIntExact(written);
}

private abstract static class AnonWritableByteChannel implements UnbufferedWritableByteChannel {

@Override
public boolean isOpen() {
return true;
}

@Override
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1694,4 +1694,72 @@ public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOp
}
return codecs.blobInfo().decode(object);
}

@Override
public BlobInfo internalDirectUpload(BlobInfo info, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {

BlobInfo.Builder builder =
info.toBuilder()
.setMd5(
BaseEncoding.base64().encode(Hashing.md5().hashBytes(buf.duplicate()).asBytes()))
.setCrc32c(
BaseEncoding.base64()
.encode(Ints.toByteArray(Hashing.crc32c().hashBytes(buf.duplicate()).asInt())));
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();

BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
final StorageObject encoded = codecs.blobInfo().encode(updated);
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsCreate(encoded, optionsMap);
RewindableContent content = RewindableContent.of(buf);
return run(
algorithm,
() -> {
content.rewindTo(0);
return storageRpc.create(encoded, new RewindableContentInputStream(content), optionsMap);
},
Conversions.json().blobInfo()::decode);
}

/**
* Behavioral difference compared to {@link #delete(BlobId, BlobSourceOption...)} instead of
* returning false when an object does not exist, we throw an exception.
*/
@Override
public Void internalObjectDelete(BlobId id, Opts<ObjectSourceOpt> opts) {
final StorageObject storageObject = codecs.blobId().encode(id);
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsDelete(storageObject, optionsMap);
return run(
algorithm,
() -> {
boolean deleted = storageRpc.delete(storageObject, optionsMap);
// HttpStorageRpc turns a 404 into false, our code needs to know 404
if (!deleted) {
throw new StorageException(404, "NOT_FOUND", null, null);
}
return null;
},
Function.identity());
}

@Override
public BlobInfo internalObjectGet(BlobId blobId, Opts<ObjectSourceOpt> opts) {
StorageObject storedObject = codecs.blobId().encode(blobId);
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsGet(storedObject, optionsMap);
return run(
algorithm,
() -> {
StorageObject storageObject = storageRpc.get(storedObject, optionsMap);
// HttpStorageRpc turns a 404 into null, our code needs to know 404
if (storageObject == null) {
throw new StorageException(404, "NOT_FOUND", null, null);
}
return storageObject;
},
codecs.blobInfo()::decode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.util.Data;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.Storage.Objects.Compose;
import com.google.api.services.storage.Storage.Objects.Get;
import com.google.api.services.storage.Storage.Objects.Insert;
import com.google.api.services.storage.model.Bucket;
Expand Down Expand Up @@ -755,13 +756,15 @@ public StorageObject compose(
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_COMPOSE);
Scope scope = tracer.withSpan(span);
try {
return storage
.objects()
.compose(target.getBucket(), target.getName(), request)
.setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(targetOptions))
.setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(targetOptions))
.setUserProject(Option.USER_PROJECT.getString(targetOptions))
.execute();
Compose compose =
storage
.objects()
.compose(target.getBucket(), target.getName(), request)
.setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(targetOptions))
.setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(targetOptions))
.setUserProject(Option.USER_PROJECT.getString(targetOptions));
setEncryptionHeaders(compose.getRequestHeaders(), ENCRYPTION_KEY_PREFIX, targetOptions);
return compose.execute();
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
throw translate(ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import static com.google.cloud.storage.ByteSizeConstants._256KiB;
import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.common.truth.Truth.assertThat;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.junit.Test;

public final class RewindableContentInputStreamTest {

@Test
public void read_empty() throws IOException {
RewindableContent content = RewindableContent.empty();
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
int read = in.read();
assertThat(read).isEqualTo(-1);
}
}

@Test
public void readB_emptySrc() throws IOException {
RewindableContent content = RewindableContent.empty();
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
int read = in.read(new byte[1]);
assertThat(read).isEqualTo(-1);
}
}

@Test
public void readB_emptyDst() throws IOException {
byte[] bytes = DataGenerator.base64Characters().genBytes(1);
RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes));
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
byte[] tmp = new byte[0];
int read = in.read(tmp);
assertThat(read).isEqualTo(0);
}
}

@Test
public void readB_singleByte() throws IOException {
byte[] bytes = DataGenerator.base64Characters().genBytes(1);
RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes));
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
byte[] tmp = new byte[_256KiB];
int read = in.read(tmp);
assertThat(read).isEqualTo(1);
assertThat(tmp[0]).isEqualTo(bytes[0]);
}
}

@Test
public void read_singleByte() throws IOException {
byte[] bytes = DataGenerator.base64Characters().genBytes(1);
RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes));
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
int read = in.read();
assertThat(read).isEqualTo(bytes[0]);
}
}

@Test
public void readB_multiContent() throws IOException {
byte[] bytes = DataGenerator.base64Characters().genBytes(30);
RewindableContent content =
RewindableContent.of(
ByteBuffer.wrap(bytes, 0, 10),
ByteBuffer.wrap(bytes, 10, 10),
ByteBuffer.wrap(bytes, 20, 10));
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
byte[] tmp = new byte[_256KiB];
int read = in.read(tmp);
assertThat(read).isEqualTo(30);
assertThat(xxd(ByteString.copyFrom(tmp, 0, read))).isEqualTo(xxd(bytes));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.DataGenerator;
import com.google.cloud.storage.GrpcStorageOptions;
import com.google.cloud.storage.HttpStorageOptions;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier;
Expand Down Expand Up @@ -73,7 +74,7 @@

@RunWith(StorageITRunner.class)
@CrossRun(
transports = {Transport.GRPC},
transports = {Transport.HTTP, Transport.GRPC},
backends = {Backend.PROD})
public final class ITParallelCompositeUploadBlobWriteSessionConfigTest {

Expand Down Expand Up @@ -125,6 +126,12 @@ public void setUp() throws Exception {
.toBuilder()
.setBlobWriteSessionConfig(pcu)
.build();
} else if (transport == Transport.HTTP) {
storageOptions =
((HttpStorageOptions) injectedStorage.getOptions())
.toBuilder()
.setBlobWriteSessionConfig(pcu)
.build();
}
assertWithMessage("unable to resolve options").that(storageOptions).isNotNull();
//noinspection DataFlowIssue
Expand Down

0 comments on commit 3bf6026

Please sign in to comment.