Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage : Fix manage resumeable signedURL uploads. #4874

Merged
merged 14 commits into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ public String open(StorageObject object, Map<Option, ?> options) throws StorageE
return fullname(object);
}

@Override
public String open(String signedURL) {
return null;
}

@Override
public void write(
String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length, boolean last)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.spi.v1.StorageRpc;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.Callable;

Expand All @@ -34,10 +35,18 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
this(options, blob, open(options, blob, optionsMap));
}

BlobWriteChannel(StorageOptions options, URL signedURL) {
this(options, open(signedURL, options));
}

BlobWriteChannel(StorageOptions options, BlobInfo blobInfo, String uploadId) {
super(options, blobInfo, uploadId);
}

BlobWriteChannel(StorageOptions options, String uploadId) {
super(options, null, uploadId);
}

@Override
protected void flushBuffer(final int length, final boolean last) {
try {
Expand Down Expand Up @@ -83,6 +92,46 @@ public String call() {
}
}

private static String open(final URL signedURL, final StorageOptions options) {
try {
return runWithRetries(
new Callable<String>() {
@Override
public String call() {
if (!isValidSignedURL(signedURL.getQuery())) {
throw new StorageException(2, "invalid signedURL");
}
return options.getStorageRpcV1().open(signedURL.toString());
}
},
options.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
options.getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
}

private static boolean isValidSignedURL(String signedURLQuery) {
boolean isValid = true;
if (signedURLQuery.startsWith("X-Goog-Algorithm=")) {
if (!signedURLQuery.contains("&X-Goog-Credential=")
|| !signedURLQuery.contains("&X-Goog-Date=")
|| !signedURLQuery.contains("&X-Goog-Expires=")
|| !signedURLQuery.contains("&X-Goog-SignedHeaders=")
|| !signedURLQuery.contains("&X-Goog-Signature=")) {
isValid = false;
}
} else if (signedURLQuery.startsWith("GoogleAccessId=")) {
if (!signedURLQuery.contains("&Expires=") || !signedURLQuery.contains("&Signature=")) {
isValid = false;
}
} else {
isValid = false;
}
return isValid;
}

static class StateImpl extends BaseWriteChannel.BaseState<StorageOptions, BlobInfo> {

private static final long serialVersionUID = -9028324143780151286L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,27 @@ Blob create(
*/
WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options);

/**
* Accepts signed URL and return a channel for writing content.
*
* <p>Example of writing content through a writer using signed URL.
*
* <pre>{@code
* String bucketName = "my_unique_bucket";
* String blobName = "my_blob_name";
* BlobId blobId = BlobId.of(bucketName, blobName);
* byte[] content = "Hello, World!".getBytes(UTF_8);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
* URL signedURL = storage.signUrl(blobInfo, 1, TimeUnit.HOURS, Storage.SignUrlOption.httpMethod(HttpMethod.POST));
* try (WriteChannel writer = storage.writer(signedURL)) {
* writer.write(ByteBuffer.wrap(content, 0, content.length));
* }
* }</pre>
*
* @throws StorageException upon failure
*/
WriteChannel writer(URL signedURL);

/**
* Generates a signed URL for a blob. If you have a blob that you want to allow access to for a
* fixed amount of time, you can use this method to generate a URL that is only valid within a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,11 @@ public BlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
return writer(targetOptions.x(), targetOptions.y());
}

@Override
public BlobWriteChannel writer(URL signedURL) {
return new BlobWriteChannel(getOptions(), signedURL);
}

private BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options) {
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blobInfo, options);
return new BlobWriteChannel(getOptions(), blobInfo, optionsMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,39 @@ public String open(StorageObject object, Map<Option, ?> options) {
}
}

@Override
public String open(String signedURL) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN);
Scope scope = tracer.withSpan(span);
try {
GenericUrl url = new GenericUrl(signedURL);
url.set("uploadType", "resumable");
String bytesArrayParameters = "";
byte[] bytesArray = new byte[bytesArrayParameters.length()];
HttpRequestFactory requestFactory = storage.getRequestFactory();
HttpRequest httpRequest =
requestFactory.buildPostRequest(
url, new ByteArrayContent("", bytesArray, 0, bytesArray.length));
HttpHeaders requestHeaders = httpRequest.getHeaders();
requestHeaders.set("X-Upload-Content-Type", "");
requestHeaders.set("x-goog-resumable", "start");
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 201) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(response.getStatusCode());
error.setMessage(response.getStatusMessage());
throw translate(error);
}
return response.getHeaders().getLocation();
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
throw translate(ex);
} finally {
scope.close();
span.end();
}
}

@Override
public RewriteResponse openRewrite(RewriteRequest rewriteRequest) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN_REWRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ StorageObject compose(
*/
String open(StorageObject object, Map<Option, ?> options);

/**
* Opens a resumable upload channel for a given signedURL.
*
* @throws StorageException upon failure
*/
String open(String signedURL);

/**
* Writes the provided bytes to a storage object at the provided location.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
Expand All @@ -60,6 +62,8 @@ public class BlobWriteChannelTest {
private static final int DEFAULT_CHUNK_SIZE = 8 * MIN_CHUNK_SIZE;
private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE;
private static final Random RANDOM = new Random();
private static final String SIGNED_URL =
"http://www.test.com/test-bucket/[email protected]&Expires=1553839761&Signature=MJUBXAZ7";

@Rule public ExpectedException thrown = ExpectedException.none();

Expand Down Expand Up @@ -265,6 +269,133 @@ public void testStateEquals() {
assertEquals(state.toString(), state2.toString());
}

@Test
public void testWriteWithSignedURLAndWithoutFlush() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)));
}

@Test
public void testWriteWithSignedURLAndWithFlush() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(CUSTOM_CHUNK_SIZE), eq(false));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
writer.setChunkSize(CUSTOM_CHUNK_SIZE);
ByteBuffer buffer = randomBuffer(CUSTOM_CHUNK_SIZE);
assertEquals(CUSTOM_CHUNK_SIZE, writer.write(buffer));
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
}

@Test
public void testWriteWithSignedURLAndFlush() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(DEFAULT_CHUNK_SIZE), eq(false));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
ByteBuffer[] buffers = new ByteBuffer[DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE];
for (int i = 0; i < buffers.length; i++) {
buffers[i] = randomBuffer(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffers[i]));
}
for (int i = 0; i < buffers.length; i++) {
assertArrayEquals(
buffers[i].array(),
Arrays.copyOfRange(
capturedBuffer.getValue(), MIN_CHUNK_SIZE * i, MIN_CHUNK_SIZE * (i + 1)));
}
}

@Test
public void testCloseWithSignedURLWithoutFlush() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
assertTrue(writer.isOpen());
writer.close();
assertArrayEquals(new byte[0], capturedBuffer.getValue());
assertTrue(!writer.isOpen());
}

@Test
public void testCloseWithSignedURLWithFlush() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(true));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
assertTrue(writer.isOpen());
writer.write(buffer);
writer.close();
assertEquals(DEFAULT_CHUNK_SIZE, capturedBuffer.getValue().length);
assertArrayEquals(buffer.array(), Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE));
assertTrue(!writer.isOpen());
}

@Test
public void testWriteWithSignedURLClosed() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
writer.close();
try {
writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE));
fail("Expected BlobWriteChannel write to throw IOException");
} catch (IOException ex) {
// expected
}
}

@Test
public void testSaveAndRestoreWithSignedURL() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance(CaptureType.ALL);
Capture<Long> capturedPosition = Capture.newInstance(CaptureType.ALL);
storageRpcMock.write(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
captureLong(capturedPosition),
eq(DEFAULT_CHUNK_SIZE),
eq(false));
expectLastCall().times(2);
replay(storageRpcMock);
ByteBuffer buffer1 = randomBuffer(DEFAULT_CHUNK_SIZE);
ByteBuffer buffer2 = randomBuffer(DEFAULT_CHUNK_SIZE);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
assertEquals(DEFAULT_CHUNK_SIZE, writer.write(buffer1));
assertArrayEquals(buffer1.array(), capturedBuffer.getValues().get(0));
assertEquals(new Long(0L), capturedPosition.getValues().get(0));
RestorableState<WriteChannel> writerState = writer.capture();
WriteChannel restoredWriter = writerState.restore();
assertEquals(DEFAULT_CHUNK_SIZE, restoredWriter.write(buffer2));
assertArrayEquals(buffer2.array(), capturedBuffer.getValues().get(1));
assertEquals(new Long(DEFAULT_CHUNK_SIZE), capturedPosition.getValues().get(1));
}

@Test
public void testRuntimeExceptionWithSignedURL() throws MalformedURLException {
String exceptionMessage = "invalid signedURL";
expect(new BlobWriteChannel(options, new URL(SIGNED_URL)))
.andThrow(new RuntimeException(exceptionMessage));
replay(storageRpcMock);
thrown.expect(StorageException.class);
thrown.expectMessage(exceptionMessage);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
}

private static ByteBuffer randomBuffer(int size) {
byte[] byteArray = new byte[size];
RANDOM.nextBytes(byteArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -319,6 +320,9 @@ public class StorageImplTest {
+ "EkPPhszldvQTY486uPxyD/D7HdfnGW/Nbw5JUhfvecAdudDEhNAQ3PNabyDMI+TpiHy4NTWOrgdcWrzj6VXcdc"
+ "+uuABnPwRCdcyJ1xl2kOrPksRnp1auNGMLOe4IpEBjGY7baX9UG8+A45MbG0aHmkR59Op/aR9XowIDAQAB";

private static final String SIGNED_URL =
"http://www.test.com/test-bucket/[email protected]&Expires=1553839761&Signature=MJUBXAZ7";

private static final ApiClock TIME_SOURCE =
new ApiClock() {
@Override
Expand Down Expand Up @@ -2835,4 +2839,14 @@ public void testRuntimeException() {
thrown.expectMessage(exceptionMessage);
storage.get(blob);
}

@Test
public void testWriterWithSignedURL() throws MalformedURLException {
EasyMock.expect(storageRpcMock.open(SIGNED_URL)).andReturn("upload-id");
EasyMock.replay(storageRpcMock);
initializeService();
WriteChannel writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
assertNotNull(writer);
assertTrue(writer.isOpen());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2542,4 +2542,28 @@ public void testEnableAndDisableBucketPolicyOnlyOnExistingBucket() throws Except
RemoteStorageHelper.forceDelete(storage, bpoBucket, 1, TimeUnit.MINUTES);
}
}

@Test
public void testUploadUsingSignedURL() throws Exception {
String blobName = "test-signed-url-upload";
BlobInfo blob = BlobInfo.newBuilder(BUCKET, blobName).build();
assertNotNull(storage.create(blob));
URL signUrl =
storage.signUrl(blob, 1, TimeUnit.HOURS, Storage.SignUrlOption.httpMethod(HttpMethod.POST));
byte[] bytesArrayToUpload = BLOB_STRING_CONTENT.getBytes();
try (WriteChannel writer = storage.writer(signUrl)) {
writer.write(ByteBuffer.wrap(bytesArrayToUpload, 0, bytesArrayToUpload.length));
}

int lengthOfDownLoadBytes = -1;
BlobId blobId = BlobId.of(BUCKET, blobName);
Blob blobToRead = storage.get(blobId);
try (ReadChannel reader = blobToRead.reader()) {
ByteBuffer bytes = ByteBuffer.allocate(64 * 1024);
lengthOfDownLoadBytes = reader.read(bytes);
}

assertEquals(bytesArrayToUpload.length, lengthOfDownLoadBytes);
assertTrue(storage.delete(BUCKET, blobName));
}
}