diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcWriteChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcWriteChannel.java index a3b75060df..7a47834610 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcWriteChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcWriteChannel.java @@ -24,6 +24,7 @@ import com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; @@ -43,6 +44,9 @@ import com.google.protobuf.UInt32Value; import com.google.protobuf.util.Timestamps; import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; @@ -52,6 +56,7 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -64,12 +69,24 @@ public final class GoogleCloudStorageGrpcWriteChannel // Default GCS upload granularity. static final int GCS_MINIMUM_CHUNK_SIZE = 256 * 1024; - private static final Duration START_RESUMABLE_WRITE_TIMEOUT = Duration.ofMinutes(10); - private static final Duration QUERY_WRITE_STATUS_TIMEOUT = Duration.ofMinutes(10); - private static final Duration WRITE_STREAM_TIMEOUT = Duration.ofMinutes(20); + private static final Duration START_RESUMABLE_WRITE_TIMEOUT = Duration.ofMinutes(1); + private static final Duration QUERY_WRITE_STATUS_TIMEOUT = Duration.ofMinutes(1); + private static final Duration WRITE_STREAM_TIMEOUT = Duration.ofMinutes(10); // Maximum number of automatic retries for each data chunk // when writing to underlying channel raises error. - private static final int UPLOAD_RETRIES = 10; + private static final int UPLOAD_RETRIES = 5; + + // Number of insert requests to retain, in case we need to rewind and resume an upload. Using too + // small of a number could risk being unable to resume the write if the resume point is an + // already-discarded buffer; and setting the value too high wastes RAM. Note: We could have a + // more complex implementation that periodically queries the service to find out the last + // committed offset, to determine what's safe to discard, but that would also impose a performance + // penalty. + private static int NUMBER_OF_REQUESTS_TO_RETAIN = 5; + // A set that defines all transient errors on which retry can be attempted. + private static final ImmutableSet TRANSIENT_ERRORS = + ImmutableSet.of( + Code.DEADLINE_EXCEEDED, Code.RESOURCE_EXHAUSTED, Code.INTERNAL, Code.UNAVAILABLE); private final StorageStub stub; private final StorageResourceId resourceId; @@ -178,8 +195,12 @@ private Object doResumableUpload() throws IOException { long writeOffset = 0; int retriesAttempted = 0; Hasher objectHasher = Hashing.crc32c().newHasher(); + // Holds list of most recent number of NUMBER_OF_REQUESTS_TO_RETAIN requests, so upload can be + // rewound and re-sent upon non-transient errors. + TreeMap dataChunkMap = new TreeMap<>(); do { - responseObserver = new InsertChunkResponseObserver(uploadId, writeOffset, objectHasher); + responseObserver = + new InsertChunkResponseObserver(uploadId, writeOffset, objectHasher, dataChunkMap); // TODO(b/151184800): Implement per-message timeout, in addition to stream timeout. stub.withDeadlineAfter(WRITE_STREAM_TIMEOUT.toMillis(), MILLISECONDS) .insertObject(responseObserver); @@ -191,17 +212,8 @@ private Object doResumableUpload() throws IOException { String.format("Resumable upload failed for '%s'", getResourceString()), e); } - if (responseObserver.hasError()) { - long committedSize = getCommittedWriteSize(uploadId); - // If the last upload completely failed then reset to where it's marked before last - // insert. Otherwise, uploaded data chunk needs to be skipped before reading from buffered - // input stream. - pipeSource.reset(); - if (committedSize > writeOffset) { - long uploadedDataChunkSize = committedSize - writeOffset; - pipeSource.skip(uploadedDataChunkSize); - writeOffset += uploadedDataChunkSize; - } + if (responseObserver.hasTransientError()) { + writeOffset = getCommittedWriteSize(uploadId); ++retriesAttempted; } else { writeOffset += responseObserver.bytesWritten(); @@ -210,11 +222,12 @@ private Object doResumableUpload() throws IOException { if (retriesAttempted >= UPLOAD_RETRIES) { throw new IOException( - String.format("Insert failed for '%s'", resourceId), responseObserver.getError()); + String.format( + "Too many retry attempts. Resumable upload failed for '%s'", resourceId)); } - } while (!responseObserver.hasFinalized() || responseObserver.hasError()); + } while (!responseObserver.hasFinalized()); - return responseObserver.getResponse(); + return responseObserver.getResponseOrThrow(); } /** Handler for responses from the Insert streaming RPC. */ @@ -223,23 +236,36 @@ private class InsertChunkResponseObserver private final long writeOffset; private final String uploadId; + // A map holding cached data chunks, keyed by the offset to start writing each data chunk. + private final TreeMap dataChunkMap; private volatile boolean objectFinalized = false; + // The last transient error to occur during the streaming RPC. + private Throwable transientError = null; + // The last non-transient error to occur during the streaming RPC. + private Throwable nonTransientError = null; // The last error to occur during the streaming RPC. Present only on error. private IOException error; // The response from the server, populated at the end of a successful streaming RPC. private Object response; private ByteString chunkData; private Hasher objectHasher; + // This flag is to avoid onReady handler of requestObserver from being called multiple times. + private boolean readyHandlerExecuted = false; // CountDownLatch tracking completion of the streaming RPC. Set on error, or once the request // stream is closed. final CountDownLatch done = new CountDownLatch(1); - InsertChunkResponseObserver(String uploadId, long writeOffset, Hasher objectHasher) { + InsertChunkResponseObserver( + String uploadId, + long writeOffset, + Hasher objectHasher, + TreeMap dataChunkMap) { this.uploadId = uploadId; this.chunkData = ByteString.EMPTY; this.writeOffset = writeOffset; this.objectHasher = objectHasher; + this.dataChunkMap = dataChunkMap; } @Override @@ -248,18 +274,42 @@ public void beforeStart(ClientCallStreamObserver requestObs new Runnable() { @Override public void run() { - if (objectFinalized) { + if (objectFinalized || readyHandlerExecuted) { // onReadyHandler may be called after we've closed the request half of the stream. return; } + readyHandlerExecuted = true; + + // Create new request builder if the writeOffset grows over inserts. Otherwise + // something went wrong and resume write from a cached request builder. + boolean shouldAttemptRetry = + dataChunkMap.size() > 0 && dataChunkMap.lastKey() >= writeOffset; + InsertObjectRequest.Builder requestBuilder = + shouldAttemptRetry + ? buildRequestFromCachedDataChunk(dataChunkMap) + : buildRequestFromPipeData(); + + if (requestBuilder == null) { + requestObserver.onError(nonTransientError); + return; + } + requestObserver.onNext(requestBuilder.build()); + requestObserver.onCompleted(); + } + + // Handles the case where we're not resuming a write, and instead we need to read data + // from the input pipe. + private InsertObjectRequest.Builder buildRequestFromPipeData() { try { chunkData = readRequestData(); } catch (IOException e) { - error = + nonTransientError = new IOException( - String.format("Failed to read chunk for '%s'", resourceId), e); - return; + String.format( + "InsertChunkResponseObserver.beforeStart for uploadId %s.", uploadId), + e); + return null; } InsertObjectRequest.Builder requestBuilder = @@ -294,45 +344,110 @@ public void run() { UInt32Value.newBuilder().setValue(objectHasher.hash().asInt()))); } } - requestObserver.onNext(requestBuilder.build()); - - if (objectFinalized) { - // Close the request half of the streaming RPC. - requestObserver.onCompleted(); + if (dataChunkMap.size() == NUMBER_OF_REQUESTS_TO_RETAIN) { + dataChunkMap.remove(dataChunkMap.firstKey()); } + dataChunkMap.put(writeOffset, chunkData); + return requestBuilder; } private ByteString readRequestData() throws IOException { - // Mark the input stream in case this request fails so that read can be recovered - // from where it's marked. - pipeSource.mark(MAX_BYTES_PER_MESSAGE); ByteString data = ByteString.readFrom(ByteStreams.limit(pipeSource, MAX_BYTES_PER_MESSAGE)); - - objectFinalized = - data.size() < MAX_BYTES_PER_MESSAGE || pipeSource.available() <= 0; + pipeSource.mark(1); + objectFinalized = data.size() < MAX_BYTES_PER_MESSAGE || pipeSource.read() == -1; + pipeSource.reset(); return data; } + + // Handles the case when a writeOffset of data read previously is being processed. + // This happens if a transient failure happens while uploading, and can be resumed by + // querying the current committed offset. + private InsertObjectRequest.Builder buildRequestFromCachedDataChunk( + TreeMap dataChunkMap) { + // Resume will only work if the first request builder in the cache carries an offset + // not greater than the current writeOffset. + InsertObjectRequest.Builder requestBuilder = null; + if (dataChunkMap.size() > 0 && dataChunkMap.firstKey() <= writeOffset) { + for (Map.Entry entry : dataChunkMap.entrySet()) { + if (entry.getKey() + entry.getValue().size() > writeOffset) { + Long writeOffsetToResume = entry.getKey(); + chunkData = entry.getValue(); + requestBuilder = + buildRequestFromWriteOffsetAndDataChunk(writeOffsetToResume, chunkData); + break; + } + } + } + if (requestBuilder == null) { + nonTransientError = + new IOException( + String.format( + "Didn't have enough data buffered for attempt to resume upload for" + + " uploadID %s: last committed offset=%s, earliest buffered" + + " offset=%s. Upload must be restarted from the beginning.", + uploadId, writeOffset, dataChunkMap.firstKey())); + return null; + } + return requestBuilder; + } + + private InsertObjectRequest.Builder buildRequestFromWriteOffsetAndDataChunk( + long writeOffset, ByteString dataChunk) { + InsertObjectRequest.Builder requestBuilder = + InsertObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(writeOffset); + + if (dataChunk.size() > 0) { + ChecksummedData.Builder requestDataBuilder = + ChecksummedData.newBuilder().setContent(dataChunk); + if (checksumsEnabled) { + Hasher chunkHasher = Hashing.crc32c().newHasher(); + for (ByteBuffer buffer : dataChunk.asReadOnlyByteBufferList()) { + chunkHasher.putBytes(buffer); + } + requestDataBuilder.setCrc32C( + UInt32Value.newBuilder().setValue(chunkHasher.hash().asInt())); + } + requestBuilder.setChecksummedData(requestDataBuilder); + } + + if (dataChunk.size() < MAX_BYTES_PER_MESSAGE) { + objectFinalized = true; + requestBuilder.setFinishWrite(true); + if (checksumsEnabled) { + requestBuilder.setObjectChecksums( + ObjectChecksums.newBuilder() + .setCrc32C( + UInt32Value.newBuilder().setValue(objectHasher.hash().asInt()))); + } + } + + return requestBuilder; + } }); } - public Object getResponse() throws IOException { - if (hasError()) { - throw getError(); + public Object getResponseOrThrow() throws IOException { + if (hasNonTransientError()) { + throw new IOException( + String.format("Resumable upload failed for '%s'", getResourceString()), + nonTransientError); } return checkNotNull(response, "Response not present for '%s'", resourceId); } - boolean hasError() { - return error != null || response == null; + boolean hasTransientError() { + return transientError != null || response == null; } - int bytesWritten() { - return chunkData.size(); + boolean hasNonTransientError() { + return nonTransientError != null; } - public IOException getError() { - return checkNotNull(error, "Error not present for '%s'", resourceId); + int bytesWritten() { + return chunkData.size(); } boolean hasFinalized() { @@ -348,13 +463,27 @@ public void onNext(Object response) { public void onError(Throwable t) { Status s = Status.fromThrowable(t); String statusDesc = s == null ? "" : s.getDescription(); - error = - new IOException( - String.format( - "Caught exception for '%s', while uploading to uploadId %s at writeOffset %d." - + " Status: %s", - resourceId, uploadId, writeOffset, statusDesc), - t); + + if (t.getClass() == StatusException.class || t.getClass() == StatusRuntimeException.class) { + Code code = + t.getClass() == StatusException.class + ? ((StatusException) t).getStatus().getCode() + : ((StatusRuntimeException) t).getStatus().getCode(); + if (TRANSIENT_ERRORS.contains(code)) { + transientError = t; + objectFinalized = false; + } + } + if (transientError == null) { + nonTransientError = + new IOException( + String.format( + "Caught exception for '%s', while uploading to uploadId %s at writeOffset %d." + + " Status: %s", + resourceId, uploadId, writeOffset, statusDesc), + t); + objectFinalized = true; + } done.countDown(); } @@ -406,8 +535,6 @@ private String startResumableUpload() throws IOException { SimpleResponseObserver responseObserver = new SimpleResponseObserver<>(); - runWithRetries( - () -> { stub.withDeadlineAfter(START_RESUMABLE_WRITE_TIMEOUT.toMillis(), MILLISECONDS) .startResumableWrite(request, responseObserver); try { @@ -418,8 +545,6 @@ private String startResumableUpload() throws IOException { String.format("Failed to start resumable upload for '%s'", getResourceString()), e); } - }, - responseObserver); return responseObserver.getResponse().getUploadId(); } @@ -431,8 +556,6 @@ private long getCommittedWriteSize(String uploadId) throws IOException { SimpleResponseObserver responseObserver = new SimpleResponseObserver<>(); - runWithRetries( - () -> { stub.withDeadlineAfter(QUERY_WRITE_STATUS_TIMEOUT.toMillis(), MILLISECONDS) .queryWriteStatus(request, responseObserver); try { @@ -443,8 +566,6 @@ private long getCommittedWriteSize(String uploadId) throws IOException { String.format("Failed to get committed write size for '%s'", getResourceString()), e); } - }, - responseObserver); return responseObserver.getResponse().getCommittedSize(); } diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcWriteChannelTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcWriteChannelTest.java index 2738b031b7..2148cf50b4 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcWriteChannelTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcWriteChannelTest.java @@ -1,6 +1,7 @@ package com.google.cloud.hadoop.gcsio; import static com.google.common.truth.Truth.assertThat; +import static com.google.google.storage.v1.ServiceConstants.Values.MAX_WRITE_CHUNK_BYTES; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @@ -16,6 +17,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.google.storage.v1.ChecksummedData; import com.google.google.storage.v1.InsertObjectRequest; import com.google.google.storage.v1.InsertObjectSpec; @@ -32,6 +34,8 @@ import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; import com.google.protobuf.UInt32Value; +import io.grpc.Status; +import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; @@ -39,8 +43,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -343,23 +349,111 @@ public void writeHandlesErrorOnInsertRequestWithoutUncommittedData() throws Exce } @Test - public void writeHandlesErrorOnInsertRequestWithLongUncommittedData() throws Exception { - GoogleCloudStorageGrpcWriteChannel writeChannel = newWriteChannel(); - long chunkSize = GoogleCloudStorageGrpcWriteChannel.GCS_MINIMUM_CHUNK_SIZE * 1024L * 1024L; - fakeService.setInsertRequestException(new IOException("Error!")); - fakeService.setResumeFromInsertException(true); + public void writeOneChunkWithSingleErrorAndResume() throws Exception { + int chunkSize = GoogleCloudStorageGrpcWriteChannel.GCS_MINIMUM_CHUNK_SIZE; + AsyncWriteChannelOptions options = + AsyncWriteChannelOptions.builder().setUploadChunkSize(chunkSize).build(); + ObjectWriteConditions writeConditions = new ObjectWriteConditions(); + GoogleCloudStorageGrpcWriteChannel writeChannel = + newWriteChannel(options, writeConditions, Optional.absent()); + fakeService.setInsertObjectExceptions( + ImmutableList.of(new StatusException(Status.DEADLINE_EXCEEDED))); fakeService.setQueryWriteStatusResponses( + ImmutableList.of(QueryWriteStatusResponse.newBuilder().setCommittedSize(0).build()) + .iterator()); + ByteString chunk = createTestData(chunkSize); + List expectedRequests = + Arrays.asList( + InsertObjectRequest.newBuilder() + .setUploadId(UPLOAD_ID) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent(chunk) + .setCrc32C(UInt32Value.newBuilder().setValue(uInt32Value(1916767651)))) + .setObjectChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(UInt32Value.newBuilder().setValue(uInt32Value(1916767651)))) + .setFinishWrite(true) + .build()); + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(InsertObjectRequest.class); + + writeChannel.initialize(); + writeChannel.write(chunk.asReadOnlyByteBuffer()); + writeChannel.close(); + + verify(fakeService, times(1)).startResumableWrite(eq(START_REQUEST), any()); + verify(fakeService, times(1)).queryWriteStatus(eq(WRITE_STATUS_REQUEST), any()); + verify(fakeService.insertRequestObserver, atLeast(1)).onNext(requestCaptor.capture()); + // TODO(hgong): Figure out a way to check the expected requests and actual reqeusts builder. + // assertEquals(expectedRequests, requestCaptor.getAllValues()); + verify(fakeService.insertRequestObserver, atLeast(1)).onCompleted(); + } + + @Test + public void writeOneChunkWithSingleErrorFailedToResume() throws Exception { + int chunkSize = GoogleCloudStorageGrpcWriteChannel.GCS_MINIMUM_CHUNK_SIZE; + AsyncWriteChannelOptions options = + AsyncWriteChannelOptions.builder().setUploadChunkSize(chunkSize).build(); + ObjectWriteConditions writeConditions = new ObjectWriteConditions(); + GoogleCloudStorageGrpcWriteChannel writeChannel = + newWriteChannel(options, writeConditions, Optional.absent()); + fakeService.setInsertObjectExceptions( + ImmutableList.of(new StatusException(Status.DEADLINE_EXCEEDED))); + fakeService.setQueryWriteStatusResponses( + ImmutableList.of(QueryWriteStatusResponse.newBuilder().setCommittedSize(-1).build()) + .iterator()); + ByteString chunk = createTestData(chunkSize); + + writeChannel.initialize(); + writeChannel.write(chunk.asReadOnlyByteBuffer()); + + assertThrows(IOException.class, writeChannel::close); + } + + @Test + public void writeTwoChunksWithSingleErrorAndResume() throws Exception { + GoogleCloudStorageGrpcWriteChannel writeChannel = newWriteChannel(); + fakeService.setInsertObjectExceptions( ImmutableList.of( - QueryWriteStatusResponse.newBuilder().setCommittedSize(chunkSize * 3 / 4).build()) + new Throwable(), // Empty cause means don't throw for first insert request + new StatusException(Status.DEADLINE_EXCEEDED))); + fakeService.setQueryWriteStatusResponses( + ImmutableList.of(QueryWriteStatusResponse.newBuilder().setCommittedSize(0).build()) .iterator()); + ByteString stream_data = createTestData(2 * MAX_WRITE_CHUNK_BYTES.getNumber()); + List expectedRequests = + Arrays.asList( + InsertObjectRequest.newBuilder() + .setUploadId(UPLOAD_ID) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent(stream_data) + .setCrc32C(UInt32Value.newBuilder().setValue(uInt32Value(1916767651)))) + .build(), + InsertObjectRequest.newBuilder() + .setUploadId(UPLOAD_ID) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent(stream_data) + .setCrc32C(UInt32Value.newBuilder().setValue(uInt32Value(1559022432)))) + .setObjectChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(UInt32Value.newBuilder().setValue(uInt32Value(1275609548)))) + .setFinishWrite(true) + .build()); + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(InsertObjectRequest.class); - ByteString data = createTestData(GoogleCloudStorageGrpcWriteChannel.GCS_MINIMUM_CHUNK_SIZE); writeChannel.initialize(); - writeChannel.write(data.asReadOnlyByteBuffer()); + writeChannel.write(stream_data.asReadOnlyByteBuffer()); writeChannel.close(); verify(fakeService, times(1)).startResumableWrite(eq(START_REQUEST), any()); - verify(fakeService, atLeast(1)).queryWriteStatus(eq(WRITE_STATUS_REQUEST), any()); + verify(fakeService, times(1)).queryWriteStatus(eq(WRITE_STATUS_REQUEST), any()); + verify(fakeService.insertRequestObserver, atLeast(1)).onNext(requestCaptor.capture()); + // TODO(hgong): Figure out a way to check the expected requests and actual reqeusts builder. + // assertEquals(expectedRequests, requestCaptor.getAllValues()); verify(fakeService.insertRequestObserver, atLeast(1)).onCompleted(); } @@ -527,6 +621,7 @@ private static class FakeService extends StorageImplBase { InsertRequestObserver insertRequestObserver = spy(new InsertRequestObserver()); private Throwable startRequestException; + private List insertObjectExceptions; private Throwable queryWriteStatusException; private Iterator queryWriteStatusResponses; @@ -560,6 +655,14 @@ public void queryWriteStatus( @Override public StreamObserver insertObject( StreamObserver responseObserver) { + if (insertObjectExceptions != null && insertObjectExceptions.size() > 0) { + Throwable throwable = insertObjectExceptions.remove(0); + if (!throwable.getClass().isAssignableFrom(Throwable.class) + || throwable.getCause() != null) { + insertRequestObserver.insertRequestException = throwable; + insertRequestObserver.resumeFromInsertException = true; + } + } insertRequestObserver.responseObserver = responseObserver; return insertRequestObserver; } @@ -584,8 +687,10 @@ void setInsertRequestException(Throwable t) { insertRequestObserver.insertRequestException = t; } - void setResumeFromInsertException(boolean resumable) { - insertRequestObserver.resumeFromInsertException = resumable; + public void setInsertObjectExceptions(List insertObjectExceptions) { + // Make a copy so caller can pass in an immutable list (this implementation needs to update + // the list). + this.insertObjectExceptions = Lists.newArrayList(insertObjectExceptions); } private static class InsertRequestObserver implements StreamObserver {