Skip to content

Commit

Permalink
format and documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
yihanzhen committed Apr 16, 2018
1 parent 067a190 commit a6ca4c1
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.google.cloud.spanner.Options.ListOption;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Paginated;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -191,8 +190,11 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
}

private static void backoffSleep(Context context, long backoffMillis) throws SpannerException {
tracer.getCurrentSpan().addAnnotation("Backing off",
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis)));
tracer
.getCurrentSpan()
.addAnnotation(
"Backing off",
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis)));
final CountDownLatch latch = new CountDownLatch(1);
final Context.CancellationListener listener =
new Context.CancellationListener() {
Expand Down Expand Up @@ -234,7 +236,8 @@ static <T> T runWithRetries(Callable<T> callable) {
while (true) {
attempt++;
try {
span.addAnnotation("Starting operation",
span.addAnnotation(
"Starting operation",
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt)));
T result = callable.call();
return result;
Expand Down Expand Up @@ -389,7 +392,8 @@ Object value() {
return ImmutableMap.copyOf(tmp);
}

private static <T extends Message> T unpack(Any response, Class<T> clazz) throws SpannerException {
private static <T extends Message> T unpack(Any response, Class<T> clazz)
throws SpannerException {
try {
return response.unpack(clazz);
} catch (InvalidProtocolBufferException e) {
Expand All @@ -398,7 +402,7 @@ private static <T extends Message> T unpack(Any response, Class<T> clazz) throws
}
}

private static abstract class PageFetcher<S, T> implements NextPageFetcher<S> {
private abstract static class PageFetcher<S, T> implements NextPageFetcher<S> {
private String nextPageToken;

@Override
Expand Down Expand Up @@ -794,12 +798,12 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
Mutation.toProto(mutations, mutationsProto);
final CommitRequest request =
CommitRequest.newBuilder()
.setSession(name)
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
.setSession(name)
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
Span span = tracer.spanBuilder(COMMIT).startSpan();
try (Scope s = tracer.withSpan(span)) {
CommitResponse response =
Expand Down Expand Up @@ -889,11 +893,11 @@ ByteString beginTransaction() {
try (Scope s = tracer.withSpan(span)) {
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(name)
.setOptions(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
.setSession(name)
.setOptions(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
Transaction txn =
runWithRetries(
new Callable<Transaction>() {
Expand Down Expand Up @@ -955,8 +959,8 @@ private AbstractReadContext(SessionImpl session, SpannerRpc rpc, int defaultPref
this(session, rpc, defaultPrefetchChunks, Tracing.getTracer().getCurrentSpan());
}

private AbstractReadContext(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks,
Span span) {
private AbstractReadContext(
SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks, Span span) {
this.session = session;
this.rpc = rpc;
this.defaultPrefetchChunks = defaultPrefetchChunks;
Expand Down Expand Up @@ -1056,15 +1060,17 @@ ResultSet executeQueryInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
return new CloseableServerStreamIterator<PartialResultSet>(rpc.executeQuery(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
null,
session.options));

// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
return new CloseableServerStreamIterator<PartialResultSet>(
rpc.executeQuery(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
null,
session.options));

// TODO(hzyi): make resume work
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too many changes at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
}
};
Expand Down Expand Up @@ -1165,15 +1171,17 @@ ResultSet readInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
return new CloseableServerStreamIterator<PartialResultSet>(rpc.read(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
null,
session.options));

// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
return new CloseableServerStreamIterator<PartialResultSet>(
rpc.read(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
null,
session.options));

// TODO(hzyi): make resume work
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too many changes at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
}
};
Expand Down Expand Up @@ -1226,8 +1234,8 @@ void backoffSleep(Context context, long backoffMillis) {
this.span = Tracing.getTracer().getCurrentSpan();
ByteString transactionId = session.readyTransactionId;
session.readyTransactionId = null;
this.txn = new TransactionContextImpl(session, transactionId, rpc, defaultPrefetchChunks,
span);
this.txn =
new TransactionContextImpl(session, transactionId, rpc, defaultPrefetchChunks, span);
}

TransactionRunnerImpl(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) {
Expand Down Expand Up @@ -1258,7 +1266,8 @@ private <T> T runInternal(TransactionCallable<T> callable) {
attempt++;
// TODO(user): When using streaming reads, consider using the first read to begin
// the txn.
span.addAnnotation("Starting Transaction Attempt",
span.addAnnotation(
"Starting Transaction Attempt",
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt)));
txn.ensureTxn();

Expand All @@ -1270,7 +1279,8 @@ private <T> T runInternal(TransactionCallable<T> callable) {
} catch (Exception e) {
txnLogger.log(Level.FINE, "User-provided TransactionCallable raised exception", e);
if (txn.isAborted()) {
span.addAnnotation("Transaction Attempt Aborted in user operation. Retrying",
span.addAnnotation(
"Transaction Attempt Aborted in user operation. Retrying",
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt)));
shouldRollback = false;
backoff(context, backoff);
Expand All @@ -1282,10 +1292,12 @@ private <T> T runInternal(TransactionCallable<T> callable) {
} else {
toThrow = newSpannerException(ErrorCode.UNKNOWN, e.getMessage(), e);
}
span.addAnnotation("Transaction Attempt Failed in user operation",
span.addAnnotation(
"Transaction Attempt Failed in user operation",
ImmutableMap.<String, AttributeValue>builder()
.putAll(TraceUtil.getExceptionAnnotations(toThrow))
.put("Attempt", AttributeValue.longAttributeValue(attempt)).build());
.putAll(TraceUtil.getExceptionAnnotations(toThrow))
.put("Attempt", AttributeValue.longAttributeValue(attempt))
.build());
throw toThrow;
} finally {
if (shouldRollback) {
Expand All @@ -1295,19 +1307,23 @@ private <T> T runInternal(TransactionCallable<T> callable) {

try {
txn.commit();
span.addAnnotation("Transaction Attempt Succeeded",
span.addAnnotation(
"Transaction Attempt Succeeded",
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt)));
return result;
} catch (AbortedException e) {
txnLogger.log(Level.FINE, "Commit aborted", e);
span.addAnnotation("Transaction Attempt Aborted in Commit. Retrying",
span.addAnnotation(
"Transaction Attempt Aborted in Commit. Retrying",
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt)));
backoff(context, backoff);
} catch (SpannerException e) {
span.addAnnotation("Transaction Attempt Failed in Commit",
span.addAnnotation(
"Transaction Attempt Failed in Commit",
ImmutableMap.<String, AttributeValue>builder()
.putAll(TraceUtil.getExceptionAnnotations(e))
.put("Attempt", AttributeValue.longAttributeValue(attempt)).build());
.putAll(TraceUtil.getExceptionAnnotations(e))
.put("Attempt", AttributeValue.longAttributeValue(attempt))
.build());
throw e;
}
}
Expand All @@ -1326,8 +1342,8 @@ public void invalidate() {
private void backoff(Context context, BackOff backoff) {
long delay = txn.getRetryDelayInMillis(backoff);
txn = new TransactionContextImpl(session, null, txn.rpc, txn.defaultPrefetchChunks, span);
span.addAnnotation("Backing off",
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(delay)));
span.addAnnotation(
"Backing off", ImmutableMap.of("Delay", AttributeValue.longAttributeValue(delay)));
sleeper.backoffSleep(context, delay);
}
}
Expand Down Expand Up @@ -1362,8 +1378,10 @@ void ensureTxn() {
span.addAnnotation("Creating Transaction");
try {
transactionId = session.beginTransaction();
span.addAnnotation("Transaction Creation Done", ImmutableMap.of("Id",
AttributeValue.stringAttributeValue(transactionId.toStringUtf8())));
span.addAnnotation(
"Transaction Creation Done",
ImmutableMap.of(
"Id", AttributeValue.stringAttributeValue(transactionId.toStringUtf8())));
txnLogger.log(
Level.FINER,
"Started transaction {0}",
Expand All @@ -1373,9 +1391,10 @@ void ensureTxn() {
throw e;
}
} else {
span.addAnnotation("Transaction Initialized",
ImmutableMap.of("Id", AttributeValue.stringAttributeValue(
transactionId.toStringUtf8())));
span.addAnnotation(
"Transaction Initialized",
ImmutableMap.of(
"Id", AttributeValue.stringAttributeValue(transactionId.toStringUtf8())));
txnLogger.log(
Level.FINER,
"Using prepared transaction {0}",
Expand Down Expand Up @@ -1677,9 +1696,9 @@ void initTransaction() {
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(options)
.build();
.setSession(session.getName())
.setOptions(options)
.build();
Transaction transaction =
runWithRetries(
new Callable<Transaction>() {
Expand All @@ -1703,8 +1722,8 @@ public Transaction call() throws Exception {
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
}
transactionId = transaction.getId();
span.addAnnotation("Transaction Creation Done",
TraceUtil.getTransactionAnnotations(transaction));
span.addAnnotation(
"Transaction Creation Done", TraceUtil.getTransactionAnnotations(transaction));
} catch (SpannerException e) {
span.addAnnotation("Transaction Creation Failed", TraceUtil.getExceptionAnnotations(e));
throw e;
Expand Down Expand Up @@ -1915,8 +1934,8 @@ private static class GrpcStruct extends Struct implements Serializable {
protected final List<Object> rowData;

/**
* Builds an immutable version of this struct using {@link Struct#newBuilder()} which is used
* as a serialization proxy.
* Builds an immutable version of this struct using {@link Struct#newBuilder()} which is used as
* a serialization proxy.
*/
private Object writeReplace() {
Builder builder = Struct.newBuilder();
Expand Down Expand Up @@ -1972,7 +1991,10 @@ private Object writeReplace() {
builder.set(fieldName).toDateArray((Iterable<Date>) value);
break;
case STRUCT:
builder.add(fieldName, fieldType.getArrayElementType().getStructFields(), (Iterable<Struct>) value);
builder.add(
fieldName,
fieldType.getArrayElementType().getStructFields(),
(Iterable<Struct>) value);
break;
default:
throw new AssertionError(
Expand All @@ -1983,7 +2005,6 @@ private Object writeReplace() {
default:
throw new AssertionError("Unhandled type code: " + fieldType.getCode());
}

}
return builder.build();
}
Expand Down Expand Up @@ -2294,8 +2315,7 @@ public CloseableServerStreamIterator(ServerStream<T> stream) {
public boolean hasNext() {
try {
return iterator.hasNext();
}
catch (Exception e) {
} catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}
Expand All @@ -2304,18 +2324,21 @@ public boolean hasNext() {
public T next() {
try {
return iterator.next();
}
catch (Exception e) {
} catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}

@Override
public void remove() {
throw UnsupportedOperationException("Not supported: remove");
}

@Override
public void close(@Nullable String message) {
try {
stream.cancel();
}
catch (Exception e) {
} catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}
Expand Down Expand Up @@ -2458,8 +2481,10 @@ protected PartialResultSet computeNext() {
while (true) {
// Eagerly start stream before consuming any buffered items.
if (stream == null) {
span.addAnnotation("Starting/Resuming stream",
ImmutableMap.of("ResumeToken",
span.addAnnotation(
"Starting/Resuming stream",
ImmutableMap.of(
"ResumeToken",
AttributeValue.stringAttributeValue(
resumeToken == null ? "null" : resumeToken.toStringUtf8())));
stream = checkNotNull(startStream(resumeToken));
Expand Down Expand Up @@ -2498,8 +2523,8 @@ protected PartialResultSet computeNext() {
}
} catch (SpannerException e) {
if (safeToRetry && e.isRetryable()) {
span.addAnnotation("Stream broken. Safe to retry",
TraceUtil.getExceptionAnnotations(e));
span.addAnnotation(
"Stream broken. Safe to retry", TraceUtil.getExceptionAnnotations(e));
logger.log(Level.FINE, "Retryable exception, will sleep and retry", e);
// Truncate any items in the buffer before the last retry token.
while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) {
Expand Down
Loading

0 comments on commit a6ca4c1

Please sign in to comment.