Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make BlobReadChannel should fail if content is updated #390

Merged
merged 3 commits into from
Nov 19, 2015
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,8 @@ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) {
}

@Override
public byte[] read(StorageObject from, Map<Option, ?> options, long position, int bytes)
throws StorageException {
public Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position,
int bytes) throws StorageException {
try {
Get req = storage.objects()
.get(from.getBucket(), from.getName())
Expand All @@ -420,12 +420,13 @@ public byte[] read(StorageObject from, Map<Option, ?> options, long position, in
.setIfMetagenerationNotMatch(IF_METAGENERATION_NOT_MATCH.getLong(options))
.setIfGenerationMatch(IF_GENERATION_MATCH.getLong(options))
.setIfGenerationNotMatch(IF_GENERATION_NOT_MATCH.getLong(options));
MediaHttpDownloader downloader = req.getMediaHttpDownloader();
downloader.setContentRange(position, Ints.checkedCast(position + bytes - 1));
downloader.setDirectDownloadEnabled(true);
StringBuilder range = new StringBuilder();
range.append("bytes=").append(position).append("-").append(position + bytes - 1);
req.getRequestHeaders().setRange(range.toString());
ByteArrayOutputStream output = new ByteArrayOutputStream();
req.executeMediaAndDownloadTo(output);
return output.toByteArray();
req.executeMedia().download(output);
String etag = req.getLastResponseHeaders().getETag();
return Tuple.of(etag, output.toByteArray());
} catch (IOException ex) {
throw translate(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ StorageObject compose(Iterable<StorageObject> sources, StorageObject target,
byte[] load(StorageObject storageObject, Map<Option, ?> options)
throws StorageException;

byte[] read(StorageObject from, Map<Option, ?> options, long position, int bytes)
Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position, int bytes)
throws StorageException;

String open(StorageObject object, Map<Option, ?> options) throws StorageException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
/**
* A channel for reading data from a Google Cloud Storage object.
*
* Implementations of this class may buffer data internally to reduce remote calls.
*
* This class is @{link Serializable}, which allows incremental reads.
* Implementations of this class may buffer data internally to reduce remote calls. This interface
* implements {@link Restorable} to allow saving the reader's state to continue reading afterwards.
*/
public interface BlobReadChannel extends ReadableByteChannel, Closeable,
Restorable<BlobReadChannel> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.gcloud.RestorableState;
import com.google.gcloud.RetryHelper;
import com.google.gcloud.spi.StorageRpc;
import com.google.gcloud.spi.StorageRpc.Tuple;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -41,6 +42,7 @@ class BlobReadChannelImpl implements BlobReadChannel {
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private String lastEtag;
private int position;
private boolean isOpen;
private boolean endOfStream;
Expand Down Expand Up @@ -117,12 +119,19 @@ public int read(ByteBuffer byteBuffer) throws IOException {
}
final int toRead = Math.max(byteBuffer.remaining(), chunkSize);
try {
buffer = runWithRetries(new Callable<byte[]>() {
Tuple<String, byte[]> result = runWithRetries(new Callable<Tuple<String, byte[]>>() {
@Override
public byte[] call() {
public Tuple<String, byte[]> call() {
return storageRpc.read(storageObject, requestOptions, position, toRead);
}
}, serviceOptions.retryParams(), StorageImpl.EXCEPTION_HANDLER);
if (lastEtag != null && !Objects.equals(result.x(), lastEtag)) {
StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("Blob ").append(blob).append(" was updated while reading");
throw new StorageException(0, messageBuilder.toString(), false);
}
lastEtag = result.x();
buffer = result.y();
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
Expand Down Expand Up @@ -152,6 +161,7 @@ static class StateImpl implements RestorableState<BlobReadChannel>, Serializable
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private final String lastEtag;
private final int position;
private final boolean isOpen;
private final boolean endOfStream;
Expand All @@ -161,6 +171,7 @@ static class StateImpl implements RestorableState<BlobReadChannel>, Serializable
this.serviceOptions = builder.serviceOptions;
this.blob = builder.blob;
this.requestOptions = builder.requestOptions;
this.lastEtag = builder.lastEtag;
this.position = builder.position;
this.isOpen = builder.isOpen;
this.endOfStream = builder.endOfStream;
Expand All @@ -171,6 +182,7 @@ static class Builder {
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private String lastEtag;
private int position;
private boolean isOpen;
private boolean endOfStream;
Expand All @@ -182,6 +194,11 @@ private Builder(StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> r
this.requestOptions = reqOptions;
}

Builder lastEtag(String lastEtag) {
this.lastEtag = lastEtag;
return this;
}

Builder position(int position) {
this.position = position;
return this;
Expand Down Expand Up @@ -215,6 +232,7 @@ static Builder builder(
@Override
public BlobReadChannel restore() {
BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions);
channel.lastEtag = lastEtag;
channel.position = position;
channel.isOpen = isOpen;
channel.endOfStream = endOfStream;
Expand All @@ -224,8 +242,8 @@ public BlobReadChannel restore() {

@Override
public int hashCode() {
return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream,
chunkSize);
return Objects.hash(serviceOptions, blob, requestOptions, lastEtag, position, isOpen,
endOfStream, chunkSize);
}

@Override
Expand All @@ -240,6 +258,7 @@ public boolean equals(Object obj) {
return Objects.equals(this.serviceOptions, other.serviceOptions)
&& Objects.equals(this.blob, other.blob)
&& Objects.equals(this.requestOptions, other.requestOptions)
&& Objects.equals(this.lastEtag, other.lastEtag)
&& this.position == other.position
&& this.isOpen == other.isOpen
&& this.endOfStream == other.endOfStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
* A channel for writing data to a Google Cloud Storage object.
*
* Implementations of this class may further buffer data internally to reduce remote calls. Written
* data will only be visible after calling {@link #close()}. This class is serializable, to allow
* incremental writes.
* data will only be visible after calling {@link #close()}. This interface implements
* {@link Restorable} to allow saving the writer's state to continue writing afterwards.
*/
public interface BlobWriteChannel extends WritableByteChannel, Closeable,
Restorable<BlobWriteChannel> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -1405,14 +1406,20 @@ private static void checkContentType(BlobInfo blobInfo) throws IllegalArgumentEx
BatchResponse apply(BatchRequest batchRequest);

/**
* Return a channel for reading the blob's content.
* Return a channel for reading the blob's content. The blob's latest generation is read. If the

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

* blob changes while reading (i.e. {@link BlobInfo#etag()} changes), subsequent calls to
* {@link BlobReadChannel#read(ByteBuffer)} may throw {@link StorageException}.
*
* @throws StorageException upon failure
*/
BlobReadChannel reader(String bucket, String blob, BlobSourceOption... options);

/**
* Return a channel for reading the blob's content.
* Return a channel for reading the blob's content. If {@code blob.generation()} is set
* data corresponding to that generation is read. If {@code blob.generation()} is {@code null}
* the blob's latest generation is read. If the blob changes while reading (i.e.
* {@link BlobInfo#etag()} changes), subsequent calls to {@link BlobReadChannel#read(ByteBuffer)}
* may throw {@link StorageException}.
*
* @throws StorageException upon failure
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertArrayEquals;
Expand All @@ -46,7 +45,7 @@ public class BlobReadChannelImplTest {

private static final String BUCKET_NAME = "b";
private static final String BLOB_NAME = "n";
private static final BlobId BLOB_ID = BlobId.of(BUCKET_NAME, BLOB_NAME);
private static final BlobId BLOB_ID = BlobId.of(BUCKET_NAME, BLOB_NAME, -1L);
private static final Map<StorageRpc.Option, ?> EMPTY_RPC_OPTIONS = ImmutableMap.of();
private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
private static final int CUSTOM_CHUNK_SIZE = 2 * 1024 * 1024;
Expand Down Expand Up @@ -88,7 +87,7 @@ public void testReadBuffered() throws IOException {
ByteBuffer firstReadBuffer = ByteBuffer.allocate(42);
ByteBuffer secondReadBuffer = ByteBuffer.allocate(42);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(result);
.andReturn(StorageRpc.Tuple.of("etag", result));
replay(storageRpcMock);
reader.read(firstReadBuffer);
reader.read(secondReadBuffer);
Expand All @@ -107,10 +106,11 @@ public void testReadBig() throws IOException {
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
ByteBuffer secondReadBuffer = ByteBuffer.allocate(42);
storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE);
expectLastCall().andReturn(firstResult);
storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, CUSTOM_CHUNK_SIZE);
expectLastCall().andReturn(secondResult);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(StorageRpc.Tuple.of("etag", firstResult));
expect(storageRpcMock.read(
BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, CUSTOM_CHUNK_SIZE))
.andReturn(StorageRpc.Tuple.of("etag", secondResult));
replay(storageRpcMock);
reader.read(firstReadBuffer);
reader.read(secondReadBuffer);
Expand All @@ -125,7 +125,7 @@ public void testReadFinish() throws IOException {
byte[] result = {};
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(result);
.andReturn(StorageRpc.Tuple.of("etag", result));
replay(storageRpcMock);
assertEquals(-1, reader.read(readBuffer));
}
Expand All @@ -137,7 +137,7 @@ public void testSeek() throws IOException {
byte[] result = randomByteArray(DEFAULT_CHUNK_SIZE);
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE))
.andReturn(result);
.andReturn(StorageRpc.Tuple.of("etag", result));
replay(storageRpcMock);
reader.read(readBuffer);
assertArrayEquals(result, readBuffer.array());
Expand Down Expand Up @@ -166,16 +166,42 @@ public void testReadClosed() {
}
}

@Test
public void testReadGenerationChanged() throws IOException {
BlobId blobId = BlobId.of(BUCKET_NAME, BLOB_NAME);
reader = new BlobReadChannelImpl(options, blobId, EMPTY_RPC_OPTIONS);
byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE);
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
expect(storageRpcMock.read(blobId.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(StorageRpc.Tuple.of("etag1", firstResult));
expect(storageRpcMock.read(
blobId.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE))
.andReturn(StorageRpc.Tuple.of("etag2", firstResult));
replay(storageRpcMock);
reader.read(firstReadBuffer);
try {
reader.read(secondReadBuffer);
fail("Expected BlobReadChannel read to throw StorageException");
} catch (StorageException ex) {
StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("Blob ").append(blobId).append(" was updated while reading");
assertEquals(messageBuilder.toString(), ex.getMessage());
// expected

This comment was marked as spam.

}
}

@Test
public void testSaveAndRestore() throws IOException, ClassNotFoundException {
byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE);
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
ByteBuffer firstReadBuffer = ByteBuffer.allocate(42);
ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(firstResult);
.andReturn(StorageRpc.Tuple.of("etag", firstResult));
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE))
.andReturn(secondResult);
.andReturn(StorageRpc.Tuple.of("etag", secondResult));
replay(storageRpcMock);
reader = new BlobReadChannelImpl(options, BLOB_ID, EMPTY_RPC_OPTIONS);
reader.read(firstReadBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -730,6 +731,39 @@ public void testReadChannelFail() throws IOException {
assertTrue(storage.delete(BUCKET, blobName));
}

@Test
public void testReadChannelFailUpdatedGeneration() throws IOException {
String blobName = "test-read-blob-fail-updated-generation";
BlobInfo blob = BlobInfo.builder(BUCKET, blobName).build();
Random random = new Random();
int chunkSize = 1024;
int blobSize = 2 * chunkSize;
byte[] content = new byte[blobSize];
random.nextBytes(content);
BlobInfo remoteBlob = storage.create(blob, content);
assertNotNull(remoteBlob);
assertEquals(blobSize, (long) remoteBlob.size());
try (BlobReadChannel reader = storage.reader(blob.blobId())) {
reader.chunkSize(chunkSize);
ByteBuffer readBytes = ByteBuffer.allocate(chunkSize);
int numReadBytes = reader.read(readBytes);
assertEquals(chunkSize, numReadBytes);
assertArrayEquals(Arrays.copyOf(content, chunkSize), readBytes.array());
try (BlobWriteChannel writer = storage.writer(blob)) {
byte[] newContent = new byte[blobSize];
random.nextBytes(newContent);
int numWrittenBytes = writer.write(ByteBuffer.wrap(newContent));
assertEquals(blobSize, numWrittenBytes);
}
readBytes = ByteBuffer.allocate(chunkSize);
reader.read(readBytes);
fail("StorageException was expected");
} catch(StorageException ex) {
// expected

This comment was marked as spam.

}
assertTrue(storage.delete(BUCKET, blobName));
}

@Test
public void testWriteChannelFail() throws IOException {
String blobName = "test-write-channel-blob-fail";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ public void testReaderWithOptions() throws IOException {
byte[] result = new byte[DEFAULT_CHUNK_SIZE];
EasyMock.expect(
storageRpcMock.read(BLOB_INFO2.toPb(), BLOB_SOURCE_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(result);
.andReturn(StorageRpc.Tuple.of("etag", result));
EasyMock.replay(storageRpcMock);
storage = options.service();
BlobReadChannel channel = storage.reader(BUCKET_NAME1, BLOB_NAME2, BLOB_SOURCE_GENERATION,
Expand All @@ -1045,7 +1045,7 @@ public void testReaderWithOptionsFromBlobId() throws IOException {
byte[] result = new byte[DEFAULT_CHUNK_SIZE];
EasyMock.expect(
storageRpcMock.read(BLOB_INFO1.blobId().toPb(), BLOB_SOURCE_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(result);
.andReturn(StorageRpc.Tuple.of("etag", result));
EasyMock.replay(storageRpcMock);
storage = options.service();
BlobReadChannel channel = storage.reader(BLOB_INFO1.blobId(),
Expand Down