-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Storage : Fix manage resumeable signedURL uploads. #4874
Changes from 10 commits
fc7ae19
e8e975d
08e0967
b512176
5f78509
422aa1d
b34a6c6
398d370
fd13f06
d3fbff7
9c0d0e4
0ee885e
28464d1
0b9be72
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2068,6 +2068,13 @@ Blob create( | |
*/ | ||
WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options); | ||
|
||
/** | ||
* accept signURL and return a channel for writing content. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you make this more consistent with the other documentation in this class? i.e. present tense ("Accepts" instead of "accept") and provide an example? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @JesseLovelace changes done with example |
||
* | ||
* @throws StorageException upon failure | ||
*/ | ||
WriteChannel writer(URL signedURL); | ||
|
||
/** | ||
* Generates a signed URL for a blob. If you have a blob that you want to allow access to for a | ||
* fixed amount of time, you can use this method to generate a URL that is only valid within a | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -289,6 +289,13 @@ StorageObject compose( | |
*/ | ||
String open(StorageObject object, Map<Option, ?> options); | ||
|
||
/** | ||
* Opens a resumable upload channel for a given signUrl. | ||
* | ||
* @throws StorageException upon failure | ||
*/ | ||
String open(String signURL); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be signedURL There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @JesseLovelace Changes Done. |
||
|
||
/** | ||
* Writes the provided bytes to a storage object at the provided location. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,9 @@ | |
import com.google.cloud.storage.spi.v1.StorageRpc; | ||
import com.google.common.collect.ImmutableMap; | ||
import java.io.IOException; | ||
import java.net.MalformedURLException; | ||
import java.net.SocketException; | ||
import java.net.URL; | ||
import java.nio.ByteBuffer; | ||
import java.util.Arrays; | ||
import java.util.Map; | ||
|
@@ -60,6 +62,8 @@ public class BlobWriteChannelTest { | |
private static final int DEFAULT_CHUNK_SIZE = 8 * MIN_CHUNK_SIZE; | ||
private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE; | ||
private static final Random RANDOM = new Random(); | ||
private static final String SIGNED_URL = | ||
"http://www.test.com/test-bucket/[email protected]&Expires=1553839761&Signature=MJUBXAZ7"; | ||
|
||
@Rule public ExpectedException thrown = ExpectedException.none(); | ||
|
||
|
@@ -265,6 +269,133 @@ public void testStateEquals() { | |
assertEquals(state.toString(), state2.toString()); | ||
} | ||
|
||
@Test | ||
public void testWriteWithSignedURLAndWithoutFlush() throws IOException { | ||
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); | ||
replay(storageRpcMock); | ||
writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); | ||
assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE))); | ||
} | ||
|
||
@Test | ||
public void testWriteWithSignedURLAndWithFlush() throws IOException { | ||
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); | ||
Capture<byte[]> capturedBuffer = Capture.newInstance(); | ||
storageRpcMock.write( | ||
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(CUSTOM_CHUNK_SIZE), eq(false)); | ||
replay(storageRpcMock); | ||
writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); | ||
writer.setChunkSize(CUSTOM_CHUNK_SIZE); | ||
ByteBuffer buffer = randomBuffer(CUSTOM_CHUNK_SIZE); | ||
assertEquals(CUSTOM_CHUNK_SIZE, writer.write(buffer)); | ||
assertArrayEquals(buffer.array(), capturedBuffer.getValue()); | ||
} | ||
|
||
@Test | ||
public void testWriteWithSignedURLAndFlush() throws IOException { | ||
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); | ||
Capture<byte[]> capturedBuffer = Capture.newInstance(); | ||
storageRpcMock.write( | ||
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(DEFAULT_CHUNK_SIZE), eq(false)); | ||
replay(storageRpcMock); | ||
writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); | ||
ByteBuffer[] buffers = new ByteBuffer[DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE]; | ||
for (int i = 0; i < buffers.length; i++) { | ||
buffers[i] = randomBuffer(MIN_CHUNK_SIZE); | ||
assertEquals(MIN_CHUNK_SIZE, writer.write(buffers[i])); | ||
} | ||
for (int i = 0; i < buffers.length; i++) { | ||
assertArrayEquals( | ||
buffers[i].array(), | ||
Arrays.copyOfRange( | ||
capturedBuffer.getValue(), MIN_CHUNK_SIZE * i, MIN_CHUNK_SIZE * (i + 1))); | ||
} | ||
} | ||
|
||
@Test | ||
public void testCloseWithSignedURLWithoutFlush() throws IOException { | ||
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); | ||
Capture<byte[]> capturedBuffer = Capture.newInstance(); | ||
storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true)); | ||
replay(storageRpcMock); | ||
writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); | ||
assertTrue(writer.isOpen()); | ||
writer.close(); | ||
assertArrayEquals(new byte[0], capturedBuffer.getValue()); | ||
assertTrue(!writer.isOpen()); | ||
} | ||
|
||
@Test | ||
public void testCloseWithSignedURLWithFlush() throws IOException { | ||
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); | ||
Capture<byte[]> capturedBuffer = Capture.newInstance(); | ||
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); | ||
storageRpcMock.write( | ||
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(true)); | ||
replay(storageRpcMock); | ||
writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); | ||
assertTrue(writer.isOpen()); | ||
writer.write(buffer); | ||
writer.close(); | ||
assertEquals(DEFAULT_CHUNK_SIZE, capturedBuffer.getValue().length); | ||
assertArrayEquals(buffer.array(), Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE)); | ||
assertTrue(!writer.isOpen()); | ||
} | ||
|
||
@Test | ||
public void testWriteWithSignedURLClosed() throws IOException { | ||
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); | ||
Capture<byte[]> capturedBuffer = Capture.newInstance(); | ||
storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true)); | ||
replay(storageRpcMock); | ||
writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); | ||
writer.close(); | ||
try { | ||
writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)); | ||
fail("Expected BlobWriteChannel write to throw IOException"); | ||
} catch (IOException ex) { | ||
// expected | ||
} | ||
} | ||
|
||
@Test | ||
public void testSaveAndRestoreWithSignedURL() throws IOException { | ||
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); | ||
Capture<byte[]> capturedBuffer = Capture.newInstance(CaptureType.ALL); | ||
Capture<Long> capturedPosition = Capture.newInstance(CaptureType.ALL); | ||
storageRpcMock.write( | ||
eq(UPLOAD_ID), | ||
capture(capturedBuffer), | ||
eq(0), | ||
captureLong(capturedPosition), | ||
eq(DEFAULT_CHUNK_SIZE), | ||
eq(false)); | ||
expectLastCall().times(2); | ||
replay(storageRpcMock); | ||
ByteBuffer buffer1 = randomBuffer(DEFAULT_CHUNK_SIZE); | ||
ByteBuffer buffer2 = randomBuffer(DEFAULT_CHUNK_SIZE); | ||
writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); | ||
assertEquals(DEFAULT_CHUNK_SIZE, writer.write(buffer1)); | ||
assertArrayEquals(buffer1.array(), capturedBuffer.getValues().get(0)); | ||
assertEquals(new Long(0L), capturedPosition.getValues().get(0)); | ||
RestorableState<WriteChannel> writerState = writer.capture(); | ||
WriteChannel restoredWriter = writerState.restore(); | ||
assertEquals(DEFAULT_CHUNK_SIZE, restoredWriter.write(buffer2)); | ||
assertArrayEquals(buffer2.array(), capturedBuffer.getValues().get(1)); | ||
assertEquals(new Long(DEFAULT_CHUNK_SIZE), capturedPosition.getValues().get(1)); | ||
} | ||
|
||
@Test | ||
public void testRuntimeExceptionWithSignedURL() throws MalformedURLException { | ||
String exceptionMessage = "invalid signedURL"; | ||
expect(new BlobWriteChannel(options, new URL(SIGNED_URL))) | ||
.andThrow(new RuntimeException(exceptionMessage)); | ||
replay(storageRpcMock); | ||
thrown.expect(StorageException.class); | ||
thrown.expectMessage(exceptionMessage); | ||
writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); | ||
} | ||
|
||
private static ByteBuffer randomBuffer(int size) { | ||
byte[] byteArray = new byte[size]; | ||
RANDOM.nextBytes(byteArray); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,6 +59,7 @@ | |
import java.io.ByteArrayInputStream; | ||
import java.io.IOException; | ||
import java.io.UnsupportedEncodingException; | ||
import java.net.MalformedURLException; | ||
import java.net.URL; | ||
import java.net.URLDecoder; | ||
import java.nio.ByteBuffer; | ||
|
@@ -319,6 +320,9 @@ public class StorageImplTest { | |
+ "EkPPhszldvQTY486uPxyD/D7HdfnGW/Nbw5JUhfvecAdudDEhNAQ3PNabyDMI+TpiHy4NTWOrgdcWrzj6VXcdc" | ||
+ "+uuABnPwRCdcyJ1xl2kOrPksRnp1auNGMLOe4IpEBjGY7baX9UG8+A45MbG0aHmkR59Op/aR9XowIDAQAB"; | ||
|
||
private static final String SIGNED_URL = | ||
"http://www.test.com/test-bucket/[email protected]&Expires=1553839761&Signature=MJUBXAZ7"; | ||
|
||
private static final ApiClock TIME_SOURCE = | ||
new ApiClock() { | ||
@Override | ||
|
@@ -2835,4 +2839,14 @@ public void testRuntimeException() { | |
thrown.expectMessage(exceptionMessage); | ||
storage.get(blob); | ||
} | ||
|
||
@Test | ||
public void testWriterWithSignedURL() throws MalformedURLException { | ||
EasyMock.expect(storageRpcMock.open(SIGNED_URL)).andReturn("upload-id"); | ||
EasyMock.replay(storageRpcMock); | ||
initializeService(); | ||
WriteChannel writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); | ||
assertNotNull(writer); | ||
assertTrue(writer.isOpen()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wouldn't be compatible with V4 Signing (which will become default in the future), which starts with X-Goog-Algorithm instead of GoogleAccessId. See #4692
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JesseLovelace V4 Signing support changes Done