Skip to content

Commit

Permalink
Change
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Jan 27, 2024
1 parent 27a1baf commit e374dd3
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
metadata.name(),
storageService,
bigArrays,
randomIntBetween(1, 8) * 1024
randomIntBetween(1, 8) * 1024,
randomBoolean()
) {
@Override
long getLargeBlobThresholdInBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private final GoogleCloudStorageService storageService;
private final GoogleCloudStorageOperationsStats stats;
private final int bufferSize;
private final boolean defaultGCSContentEncoding;
private final BigArrays bigArrays;

GoogleCloudStorageBlobStore(
Expand All @@ -120,6 +121,18 @@ class GoogleCloudStorageBlobStore implements BlobStore {
GoogleCloudStorageService storageService,
BigArrays bigArrays,
int bufferSize
) {
this(bucketName, clientName, repositoryName, storageService, bigArrays, bufferSize, ENABLE_DEFAULT_GCP_COMPRESSION);
}

GoogleCloudStorageBlobStore(
String bucketName,
String clientName,
String repositoryName,
GoogleCloudStorageService storageService,
BigArrays bigArrays,
int bufferSize,
boolean defaultGCSContentEncoding
) {
this.bucketName = bucketName;
this.clientName = clientName;
Expand All @@ -128,6 +141,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
this.bigArrays = bigArrays;
this.stats = new GoogleCloudStorageOperationsStats(bucketName);
this.bufferSize = bufferSize;
this.defaultGCSContentEncoding = defaultGCSContentEncoding;
}

private Storage client() throws IOException {
Expand Down Expand Up @@ -486,7 +500,7 @@ private void writeBlobMultipart(BlobInfo blobInfo, byte[] buffer, int offset, in
try {
final Storage.BlobTargetOption[] targetOptions;
// By default GCS will gzip compress uploads. We disable this unless the system property to re-enable this has been configured
if (ENABLE_DEFAULT_GCP_COMPRESSION == false) {
if (defaultGCSContentEncoding == false) {
targetOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_COMPRESSION : OVERWRITE_NO_COMPRESSION;
} else {
targetOptions = failIfAlreadyExists ? NO_OVERWRITE_COMPRESSION : OVERWRITE_COMPRESSION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ StorageOptions createStorageOptions(
"repo",
service,
BigArrays.NON_RECYCLING_INSTANCE,
randomIntBetween(1, 8) * 1024
randomIntBetween(1, 8) * 1024,
randomBoolean()
);

return new GoogleCloudStorageBlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), blobStore);
Expand Down Expand Up @@ -203,7 +204,10 @@ public void testWriteBlobWithRetries() throws Exception {
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
if (countDown.countDown()) {
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(exchange.getRequestBody());
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(
exchange.getRequestHeaders(),
exchange.getRequestBody()
);
assertThat(content, isPresent());
assertThat(content.get().v1(), equalTo(blobContainer.path().buildAsString() + "write_blob_max_retries"));
if (Objects.deepEquals(bytes, BytesReference.toBytes(content.get().v2()))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti
"repo",
storageService,
BigArrays.NON_RECYCLING_INSTANCE,
randomIntBetween(1, 8) * 1024
randomIntBetween(1, 8) * 1024,
randomBoolean()
)
) {
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package fixture.gcs;

import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

Expand Down Expand Up @@ -179,7 +180,10 @@ public void handle(final HttpExchange exchange) throws IOException {

} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) {
// Multipart upload
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(requestBody.streamInput());
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(
exchange.getRequestHeaders(),
requestBody.streamInput()
);
if (content.isPresent()) {
blobs.put(content.get().v1(), content.get().v2());

Expand Down Expand Up @@ -281,10 +285,13 @@ private static String httpServerUrl(final HttpExchange exchange) {

private static final Pattern NAME_PATTERN = Pattern.compile("\"name\":\"([^\"]*)\"");

public static Optional<Tuple<String, BytesReference>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
public static Optional<Tuple<String, BytesReference>> parseMultipartRequestBody(Headers requestHeaders, final InputStream requestBody)
throws IOException {
List<String> encoding = requestHeaders.get("Content-encoding");
String contentEncoding = encoding == null ? null : encoding.get(0);
Tuple<String, BytesReference> content = null;
final BytesReference fullRequestBody;
try (InputStream in = new GZIPInputStream(requestBody)) {
try (InputStream in = "gzip".equals(contentEncoding) ? new GZIPInputStream(requestBody) : requestBody) {
fullRequestBody = Streams.readFully(in);
}
String name = null;
Expand Down

0 comments on commit e374dd3

Please sign in to comment.