Skip to content

Commit

Permalink
Rewrite retry mechanism for gRPC write in gcsio (#402)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyegong authored Jul 14, 2020
1 parent 6723d3e commit d51f2b6
Show file tree
Hide file tree
Showing 2 changed files with 296 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
Expand All @@ -43,6 +44,9 @@
import com.google.protobuf.UInt32Value;
import com.google.protobuf.util.Timestamps;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
Expand All @@ -52,6 +56,7 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand All @@ -64,12 +69,24 @@ public final class GoogleCloudStorageGrpcWriteChannel
// Default GCS upload granularity.
static final int GCS_MINIMUM_CHUNK_SIZE = 256 * 1024;

private static final Duration START_RESUMABLE_WRITE_TIMEOUT = Duration.ofMinutes(10);
private static final Duration QUERY_WRITE_STATUS_TIMEOUT = Duration.ofMinutes(10);
private static final Duration WRITE_STREAM_TIMEOUT = Duration.ofMinutes(20);
private static final Duration START_RESUMABLE_WRITE_TIMEOUT = Duration.ofMinutes(1);
private static final Duration QUERY_WRITE_STATUS_TIMEOUT = Duration.ofMinutes(1);
private static final Duration WRITE_STREAM_TIMEOUT = Duration.ofMinutes(10);
// Maximum number of automatic retries for each data chunk
// when writing to underlying channel raises error.
private static final int UPLOAD_RETRIES = 10;
private static final int UPLOAD_RETRIES = 5;

// Number of insert requests to retain, in case we need to rewind and resume an upload. Using too
// small of a number could risk being unable to resume the write if the resume point is an
// already-discarded buffer; and setting the value too high wastes RAM. Note: We could have a
// more complex implementation that periodically queries the service to find out the last
// committed offset, to determine what's safe to discard, but that would also impose a performance
// penalty.
private static int NUMBER_OF_REQUESTS_TO_RETAIN = 5;
// A set that defines all transient errors on which retry can be attempted.
private static final ImmutableSet<Code> TRANSIENT_ERRORS =
ImmutableSet.of(
Code.DEADLINE_EXCEEDED, Code.RESOURCE_EXHAUSTED, Code.INTERNAL, Code.UNAVAILABLE);

private final StorageStub stub;
private final StorageResourceId resourceId;
Expand Down Expand Up @@ -178,8 +195,12 @@ private Object doResumableUpload() throws IOException {
long writeOffset = 0;
int retriesAttempted = 0;
Hasher objectHasher = Hashing.crc32c().newHasher();
// Holds list of most recent number of NUMBER_OF_REQUESTS_TO_RETAIN requests, so upload can be
// rewound and re-sent upon non-transient errors.
TreeMap<Long, ByteString> dataChunkMap = new TreeMap<>();
do {
responseObserver = new InsertChunkResponseObserver(uploadId, writeOffset, objectHasher);
responseObserver =
new InsertChunkResponseObserver(uploadId, writeOffset, objectHasher, dataChunkMap);
// TODO(b/151184800): Implement per-message timeout, in addition to stream timeout.
stub.withDeadlineAfter(WRITE_STREAM_TIMEOUT.toMillis(), MILLISECONDS)
.insertObject(responseObserver);
Expand All @@ -191,17 +212,8 @@ private Object doResumableUpload() throws IOException {
String.format("Resumable upload failed for '%s'", getResourceString()), e);
}

if (responseObserver.hasError()) {
long committedSize = getCommittedWriteSize(uploadId);
// If the last upload completely failed then reset to where it's marked before last
// insert. Otherwise, uploaded data chunk needs to be skipped before reading from buffered
// input stream.
pipeSource.reset();
if (committedSize > writeOffset) {
long uploadedDataChunkSize = committedSize - writeOffset;
pipeSource.skip(uploadedDataChunkSize);
writeOffset += uploadedDataChunkSize;
}
if (responseObserver.hasTransientError()) {
writeOffset = getCommittedWriteSize(uploadId);
++retriesAttempted;
} else {
writeOffset += responseObserver.bytesWritten();
Expand All @@ -210,11 +222,12 @@ private Object doResumableUpload() throws IOException {

if (retriesAttempted >= UPLOAD_RETRIES) {
throw new IOException(
String.format("Insert failed for '%s'", resourceId), responseObserver.getError());
String.format(
"Too many retry attempts. Resumable upload failed for '%s'", resourceId));
}
} while (!responseObserver.hasFinalized() || responseObserver.hasError());
} while (!responseObserver.hasFinalized());

return responseObserver.getResponse();
return responseObserver.getResponseOrThrow();
}

/** Handler for responses from the Insert streaming RPC. */
Expand All @@ -223,23 +236,36 @@ private class InsertChunkResponseObserver

private final long writeOffset;
private final String uploadId;
// A map holding cached data chunks, keyed by the offset to start writing each data chunk.
private final TreeMap<Long, ByteString> dataChunkMap;
private volatile boolean objectFinalized = false;
// The last transient error to occur during the streaming RPC.
private Throwable transientError = null;
// The last non-transient error to occur during the streaming RPC.
private Throwable nonTransientError = null;
// The last error to occur during the streaming RPC. Present only on error.
private IOException error;
// The response from the server, populated at the end of a successful streaming RPC.
private Object response;
private ByteString chunkData;
private Hasher objectHasher;
// This flag is to avoid onReady handler of requestObserver from being called multiple times.
private boolean readyHandlerExecuted = false;

// CountDownLatch tracking completion of the streaming RPC. Set on error, or once the request
// stream is closed.
final CountDownLatch done = new CountDownLatch(1);

InsertChunkResponseObserver(String uploadId, long writeOffset, Hasher objectHasher) {
InsertChunkResponseObserver(
String uploadId,
long writeOffset,
Hasher objectHasher,
TreeMap<Long, ByteString> dataChunkMap) {
this.uploadId = uploadId;
this.chunkData = ByteString.EMPTY;
this.writeOffset = writeOffset;
this.objectHasher = objectHasher;
this.dataChunkMap = dataChunkMap;
}

@Override
Expand All @@ -248,18 +274,42 @@ public void beforeStart(ClientCallStreamObserver<InsertObjectRequest> requestObs
new Runnable() {
@Override
public void run() {
if (objectFinalized) {
if (objectFinalized || readyHandlerExecuted) {
// onReadyHandler may be called after we've closed the request half of the stream.
return;
}
readyHandlerExecuted = true;

// Create new request builder if the writeOffset grows over inserts. Otherwise
// something went wrong and resume write from a cached request builder.
boolean shouldAttemptRetry =
dataChunkMap.size() > 0 && dataChunkMap.lastKey() >= writeOffset;
InsertObjectRequest.Builder requestBuilder =
shouldAttemptRetry
? buildRequestFromCachedDataChunk(dataChunkMap)
: buildRequestFromPipeData();

if (requestBuilder == null) {
requestObserver.onError(nonTransientError);
return;
}

requestObserver.onNext(requestBuilder.build());
requestObserver.onCompleted();
}

// Handles the case where we're not resuming a write, and instead we need to read data
// from the input pipe.
private InsertObjectRequest.Builder buildRequestFromPipeData() {
try {
chunkData = readRequestData();
} catch (IOException e) {
error =
nonTransientError =
new IOException(
String.format("Failed to read chunk for '%s'", resourceId), e);
return;
String.format(
"InsertChunkResponseObserver.beforeStart for uploadId %s.", uploadId),
e);
return null;
}

InsertObjectRequest.Builder requestBuilder =
Expand Down Expand Up @@ -294,45 +344,110 @@ public void run() {
UInt32Value.newBuilder().setValue(objectHasher.hash().asInt())));
}
}
requestObserver.onNext(requestBuilder.build());

if (objectFinalized) {
// Close the request half of the streaming RPC.
requestObserver.onCompleted();
if (dataChunkMap.size() == NUMBER_OF_REQUESTS_TO_RETAIN) {
dataChunkMap.remove(dataChunkMap.firstKey());
}
dataChunkMap.put(writeOffset, chunkData);
return requestBuilder;
}

private ByteString readRequestData() throws IOException {
// Mark the input stream in case this request fails so that read can be recovered
// from where it's marked.
pipeSource.mark(MAX_BYTES_PER_MESSAGE);
ByteString data =
ByteString.readFrom(ByteStreams.limit(pipeSource, MAX_BYTES_PER_MESSAGE));

objectFinalized =
data.size() < MAX_BYTES_PER_MESSAGE || pipeSource.available() <= 0;
pipeSource.mark(1);
objectFinalized = data.size() < MAX_BYTES_PER_MESSAGE || pipeSource.read() == -1;
pipeSource.reset();
return data;
}

// Handles the case when a writeOffset of data read previously is being processed.
// This happens if a transient failure happens while uploading, and can be resumed by
// querying the current committed offset.
private InsertObjectRequest.Builder buildRequestFromCachedDataChunk(
TreeMap<Long, ByteString> dataChunkMap) {
// Resume will only work if the first request builder in the cache carries an offset
// not greater than the current writeOffset.
InsertObjectRequest.Builder requestBuilder = null;
if (dataChunkMap.size() > 0 && dataChunkMap.firstKey() <= writeOffset) {
for (Map.Entry<Long, ByteString> entry : dataChunkMap.entrySet()) {
if (entry.getKey() + entry.getValue().size() > writeOffset) {
Long writeOffsetToResume = entry.getKey();
chunkData = entry.getValue();
requestBuilder =
buildRequestFromWriteOffsetAndDataChunk(writeOffsetToResume, chunkData);
break;
}
}
}
if (requestBuilder == null) {
nonTransientError =
new IOException(
String.format(
"Didn't have enough data buffered for attempt to resume upload for"
+ " uploadID %s: last committed offset=%s, earliest buffered"
+ " offset=%s. Upload must be restarted from the beginning.",
uploadId, writeOffset, dataChunkMap.firstKey()));
return null;
}
return requestBuilder;
}

private InsertObjectRequest.Builder buildRequestFromWriteOffsetAndDataChunk(
long writeOffset, ByteString dataChunk) {
InsertObjectRequest.Builder requestBuilder =
InsertObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(writeOffset);

if (dataChunk.size() > 0) {
ChecksummedData.Builder requestDataBuilder =
ChecksummedData.newBuilder().setContent(dataChunk);
if (checksumsEnabled) {
Hasher chunkHasher = Hashing.crc32c().newHasher();
for (ByteBuffer buffer : dataChunk.asReadOnlyByteBufferList()) {
chunkHasher.putBytes(buffer);
}
requestDataBuilder.setCrc32C(
UInt32Value.newBuilder().setValue(chunkHasher.hash().asInt()));
}
requestBuilder.setChecksummedData(requestDataBuilder);
}

if (dataChunk.size() < MAX_BYTES_PER_MESSAGE) {
objectFinalized = true;
requestBuilder.setFinishWrite(true);
if (checksumsEnabled) {
requestBuilder.setObjectChecksums(
ObjectChecksums.newBuilder()
.setCrc32C(
UInt32Value.newBuilder().setValue(objectHasher.hash().asInt())));
}
}

return requestBuilder;
}
});
}

public Object getResponse() throws IOException {
if (hasError()) {
throw getError();
public Object getResponseOrThrow() throws IOException {
if (hasNonTransientError()) {
throw new IOException(
String.format("Resumable upload failed for '%s'", getResourceString()),
nonTransientError);
}
return checkNotNull(response, "Response not present for '%s'", resourceId);
}

boolean hasError() {
return error != null || response == null;
boolean hasTransientError() {
return transientError != null || response == null;
}

int bytesWritten() {
return chunkData.size();
boolean hasNonTransientError() {
return nonTransientError != null;
}

public IOException getError() {
return checkNotNull(error, "Error not present for '%s'", resourceId);
int bytesWritten() {
return chunkData.size();
}

boolean hasFinalized() {
Expand All @@ -348,13 +463,27 @@ public void onNext(Object response) {
public void onError(Throwable t) {
Status s = Status.fromThrowable(t);
String statusDesc = s == null ? "" : s.getDescription();
error =
new IOException(
String.format(
"Caught exception for '%s', while uploading to uploadId %s at writeOffset %d."
+ " Status: %s",
resourceId, uploadId, writeOffset, statusDesc),
t);

if (t.getClass() == StatusException.class || t.getClass() == StatusRuntimeException.class) {
Code code =
t.getClass() == StatusException.class
? ((StatusException) t).getStatus().getCode()
: ((StatusRuntimeException) t).getStatus().getCode();
if (TRANSIENT_ERRORS.contains(code)) {
transientError = t;
objectFinalized = false;
}
}
if (transientError == null) {
nonTransientError =
new IOException(
String.format(
"Caught exception for '%s', while uploading to uploadId %s at writeOffset %d."
+ " Status: %s",
resourceId, uploadId, writeOffset, statusDesc),
t);
objectFinalized = true;
}
done.countDown();
}

Expand Down Expand Up @@ -406,8 +535,6 @@ private String startResumableUpload() throws IOException {

SimpleResponseObserver<StartResumableWriteResponse> responseObserver =
new SimpleResponseObserver<>();
runWithRetries(
() -> {
stub.withDeadlineAfter(START_RESUMABLE_WRITE_TIMEOUT.toMillis(), MILLISECONDS)
.startResumableWrite(request, responseObserver);
try {
Expand All @@ -418,8 +545,6 @@ private String startResumableUpload() throws IOException {
String.format("Failed to start resumable upload for '%s'", getResourceString()),
e);
}
},
responseObserver);

return responseObserver.getResponse().getUploadId();
}
Expand All @@ -431,8 +556,6 @@ private long getCommittedWriteSize(String uploadId) throws IOException {

SimpleResponseObserver<QueryWriteStatusResponse> responseObserver =
new SimpleResponseObserver<>();
runWithRetries(
() -> {
stub.withDeadlineAfter(QUERY_WRITE_STATUS_TIMEOUT.toMillis(), MILLISECONDS)
.queryWriteStatus(request, responseObserver);
try {
Expand All @@ -443,8 +566,6 @@ private long getCommittedWriteSize(String uploadId) throws IOException {
String.format("Failed to get committed write size for '%s'", getResourceString()),
e);
}
},
responseObserver);

return responseObserver.getResponse().getCommittedSize();
}
Expand Down
Loading

0 comments on commit d51f2b6

Please sign in to comment.