Skip to content

Commit

Permalink
Spanner: migrate streaming methods to gapic (#3139)
Browse files Browse the repository at this point in the history
* Make all the unit tests work. ITs are failing pending on another PR.

* Make integration tests work

* Documentation and format
  • Loading branch information
yihanzhen authored Apr 17, 2018
1 parent 45e76f3 commit 02f0bc7
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.spanner.SpannerImpl.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Struct;
Expand Down Expand Up @@ -68,7 +69,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
super(
checkNotNull(session),
checkNotNull(bound),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
Expand All @@ -81,7 +82,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
checkNotNull(session),
checkNotNull(batchTransactionId).getTransactionId(),
batchTransactionId.getTimestamp(),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.paging.Page;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.BaseService;
import com.google.cloud.ByteArray;
Expand Down Expand Up @@ -164,7 +165,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
SpannerImpl(SpannerOptions options) {
this(
options.getSpannerRpcV1(),
GapicSpannerRpc.create(options),
options.getGapicSpannerRpc(),
options.getPrefetchChunks(),
options);
}
Expand Down Expand Up @@ -828,7 +829,7 @@ public ReadContext singleUse() {

@Override
public ReadContext singleUse(TimestampBound bound) {
return setActive(new SingleReadContext(this, bound, rawGrpcRpc, defaultPrefetchChunks));
return setActive(new SingleReadContext(this, bound, gapicRpc, defaultPrefetchChunks));
}

@Override
Expand All @@ -839,7 +840,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
return setActive(
new SingleUseReadOnlyTransaction(this, bound, rawGrpcRpc, defaultPrefetchChunks));
new SingleUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks));
}

@Override
Expand All @@ -850,12 +851,12 @@ public ReadOnlyTransaction readOnlyTransaction() {
@Override
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
return setActive(
new MultiUseReadOnlyTransaction(this, bound, rawGrpcRpc, defaultPrefetchChunks));
new MultiUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks));
}

@Override
public TransactionRunner readWriteTransaction() {
return setActive(new TransactionRunnerImpl(this, rawGrpcRpc, defaultPrefetchChunks));
return setActive(new TransactionRunnerImpl(this, gapicRpc, defaultPrefetchChunks));
}

@Override
Expand Down Expand Up @@ -1055,20 +1056,18 @@ ResultSet executeQueryInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
SpannerRpc.StreamingCall call =
return new CloseableServerStreamIterator<PartialResultSet>(
rpc.executeQuery(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
stream.consumer(),
session.options);
// We get one message for free.
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
}
stream.setCall(call);
return stream;
null,
session.options));

// TODO(hzyi): make resume work
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
}
};
return new GrpcResultSet(stream, this, queryMode);
Expand Down Expand Up @@ -1168,20 +1167,18 @@ ResultSet readInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
SpannerRpc.StreamingCall call =
return new CloseableServerStreamIterator<PartialResultSet>(
rpc.read(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
stream.consumer(),
session.options);
// We get one message for free.
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
}
stream.setCall(call);
return stream;
null,
session.options));

// TODO(hzyi): make resume work
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
}
};
GrpcResultSet resultSet =
Expand Down Expand Up @@ -2287,6 +2284,52 @@ interface CloseableIterator<T> extends Iterator<T> {
void close(@Nullable String message);
}

private static final class CloseableServerStreamIterator<T> implements CloseableIterator<T> {

private final ServerStream<T> stream;
private final Iterator<T> iterator;

public CloseableServerStreamIterator(ServerStream<T> stream) {
this.stream = stream;
this.iterator = stream.iterator();
}

@Override
public boolean hasNext() {
try {
return iterator.hasNext();
}
catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}

@Override
public T next() {
try {
return iterator.next();
}
catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}

@Override
public void remove() {
throw new UnsupportedOperationException("Not supported: remove.");
}

@Override
public void close(@Nullable String message) {
try {
stream.cancel();
}
catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}
}

/** Adapts a streaming read/query call into an iterator over partial result sets. */
@VisibleForTesting
static class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.ServiceRpc;
import com.google.cloud.TransportOptions;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.SpannerRpcFactory;
Expand Down Expand Up @@ -343,6 +344,10 @@ protected SpannerRpc getSpannerRpcV1() {
return (SpannerRpc) getRpc();
}

protected SpannerRpc getGapicSpannerRpc() {
return GapicSpannerRpc.create(this);
}

@SuppressWarnings("unchecked")
@Override
public Builder toBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
Expand Down Expand Up @@ -79,6 +80,7 @@
import com.google.spanner.v1.PartitionQueryRequest;
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
Expand Down Expand Up @@ -391,16 +393,17 @@ public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
}

@Override
public StreamingCall read(
public ServerStream<PartialResultSet> read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
GrpcCallContext context = newCallContext(options, request.getSession());
throw new UnsupportedOperationException("not implemented yet");
return stub.streamingReadCallable().call(request, context);
}

@Override
public StreamingCall executeQuery(
public ServerStream<PartialResultSet> executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = newCallContext(options, request.getSession());
return stub.executeStreamingSqlCallable().call(request, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceOptions;
Expand Down Expand Up @@ -366,25 +367,15 @@ public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
}

@Override
public StreamingCall read(
public ServerStream<PartialResultSet> read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
return doStreamingCall(
SpannerGrpc.METHOD_STREAMING_READ,
request,
consumer,
request.getSession(),
Option.CHANNEL_HINT.getLong(options));
throw new UnsupportedOperationException("Not implemented: read");
}

@Override
public StreamingCall executeQuery(
public ServerStream<PartialResultSet> executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
return doStreamingCall(
SpannerGrpc.METHOD_EXECUTE_STREAMING_SQL,
request,
consumer,
request.getSession(),
Option.CHANNEL_HINT.getLong(options));
throw new UnsupportedOperationException("Not implemented: executeQuery");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner.spi.v1;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
Expand Down Expand Up @@ -197,10 +198,10 @@ Session createSession(String databaseName, @Nullable Map<String, String> labels,

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

StreamingCall read(
ServerStream<PartialResultSet> read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

StreamingCall executeQuery(
ServerStream<PartialResultSet> executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Transaction beginTransaction(BeginTransactionRequest request, @Nullable Map<Option, ?> options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public final class BatchClientImplTest {
private static final ByteString TXN_ID = ByteString.copyFromUtf8("my-txn");
private static final String TIMESTAMP = "2017-11-15T10:54:20Z";

@Mock private SpannerRpc rpc;
@Mock private SpannerRpc rawGrpcRpc;
@Mock private SpannerRpc gapicRpc;
@Mock private SpannerOptions spannerOptions;
@Captor private ArgumentCaptor<Map<SpannerRpc.Option, Object>> optionsCaptor;
@Mock private BatchTransactionId txnID;
Expand All @@ -59,20 +60,20 @@ public final class BatchClientImplTest {
public void setUp() {
initMocks(this);
DatabaseId db = DatabaseId.of(DB_NAME);
SpannerImpl spanner = new SpannerImpl(rpc, rpc, 1, spannerOptions);
SpannerImpl spanner = new SpannerImpl(rawGrpcRpc, gapicRpc, 1, spannerOptions);
client = new BatchClientImpl(db, spanner);
}

@Test
public void testBatchReadOnlyTxnWithBound() throws Exception {
Session sessionProto = Session.newBuilder().setName(SESSION_NAME).build();
when(rpc.createSession(eq(DB_NAME), (Map<String, String>) anyMap(), optionsCaptor.capture()))
when(gapicRpc.createSession(eq(DB_NAME), (Map<String, String>) anyMap(), optionsCaptor.capture()))
.thenReturn(sessionProto);
com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP);
Transaction txnMetadata =
Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build();
when(spannerOptions.getSpannerRpcV1()).thenReturn(rpc);
when(rpc.beginTransaction(Mockito.<BeginTransactionRequest>any(), optionsCaptor.capture()))
when(spannerOptions.getGapicSpannerRpc()).thenReturn(gapicRpc);
when(gapicRpc.beginTransaction(Mockito.<BeginTransactionRequest>any(), optionsCaptor.capture()))
.thenReturn(txnMetadata);

BatchReadOnlyTransaction batchTxn = client.batchReadOnlyTransaction(TimestampBound.strong());
Expand Down
Loading

0 comments on commit 02f0bc7

Please sign in to comment.