From f2085900a916e7761886619b0c85ab4e6a5fc377 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Mon, 9 Apr 2018 15:10:55 -0700 Subject: [PATCH 1/3] Make all the unit tests work. ITs are failing pending on another PR. --- .../com/google/cloud/spanner/SpannerImpl.java | 79 +++++++----- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 12 +- .../cloud/spanner/spi/v1/GrpcSpannerRpc.java | 19 +-- .../cloud/spanner/spi/v1/SpannerRpc.java | 5 +- .../spanner/ServerStreamingStashCallable.java | 116 ++++++++++++++++++ .../google/cloud/spanner/SessionImplTest.java | 23 ++-- 6 files changed, 190 insertions(+), 64 deletions(-) create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/ServerStreamingStashCallable.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 7558b9cb36c3..363b2b5e9d0d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -25,6 +25,7 @@ import com.google.api.client.util.BackOff; import com.google.api.client.util.ExponentialBackOff; import com.google.api.gax.paging.Page; +import com.google.api.gax.rpc.ServerStream; import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.BaseService; import com.google.cloud.ByteArray; @@ -828,7 +829,7 @@ public ReadContext singleUse() { @Override public ReadContext singleUse(TimestampBound bound) { - return setActive(new SingleReadContext(this, bound, rawGrpcRpc, defaultPrefetchChunks)); + return setActive(new SingleReadContext(this, bound, gapicRpc, defaultPrefetchChunks)); } @Override @@ -839,7 +840,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() { @Override public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { return setActive( - new SingleUseReadOnlyTransaction(this, bound, rawGrpcRpc, defaultPrefetchChunks)); + new SingleUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks)); } @Override @@ -850,12 +851,12 @@ public ReadOnlyTransaction readOnlyTransaction() { @Override public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { return setActive( - new MultiUseReadOnlyTransaction(this, bound, rawGrpcRpc, defaultPrefetchChunks)); + new MultiUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks)); } @Override public TransactionRunner readWriteTransaction() { - return setActive(new TransactionRunnerImpl(this, rawGrpcRpc, defaultPrefetchChunks)); + return setActive(new TransactionRunnerImpl(this, gapicRpc, defaultPrefetchChunks)); } @Override @@ -1055,20 +1056,14 @@ ResultSet executeQueryInternalWithOptions( new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { - GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); - SpannerRpc.StreamingCall call = - rpc.executeQuery( - resumeToken == null - ? request - : request.toBuilder().setResumeToken(resumeToken).build(), - stream.consumer(), - session.options); - // We get one message for free. - if (prefetchChunks > 1) { - call.request(prefetchChunks - 1); - } - stream.setCall(call); - return stream; + return new CloseableServerStreamIterator(rpc.executeQuery( + resumeToken == null + ? request + : request.toBuilder().setResumeToken(resumeToken).build(), + null, + session.options)); + + // let resume fail for now } }; return new GrpcResultSet(stream, this, queryMode); @@ -1168,20 +1163,14 @@ ResultSet readInternalWithOptions( new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { - GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); - SpannerRpc.StreamingCall call = - rpc.read( - resumeToken == null - ? request - : request.toBuilder().setResumeToken(resumeToken).build(), - stream.consumer(), - session.options); - // We get one message for free. - if (prefetchChunks > 1) { - call.request(prefetchChunks - 1); - } - stream.setCall(call); - return stream; + return new CloseableServerStreamIterator(rpc.read( + resumeToken == null + ? request + : request.toBuilder().setResumeToken(resumeToken).build(), + null, + session.options)); + + // let resume fail for now } }; GrpcResultSet resultSet = @@ -2287,6 +2276,32 @@ interface CloseableIterator extends Iterator { void close(@Nullable String message); } + private static final class CloseableServerStreamIterator implements CloseableIterator { + + private final ServerStream stream; + private final Iterator iterator; + + public CloseableServerStreamIterator(ServerStream stream) { + this.stream = stream; + this.iterator = stream.iterator(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public void close(@Nullable String message) { + stream.cancel(); + } + } + /** Adapts a streaming read/query call into an iterator over partial result sets. */ @VisibleForTesting static class GrpcStreamIterator extends AbstractIterator diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 04c05b64e52a..9c3c50418947 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -27,6 +27,7 @@ import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; @@ -79,6 +80,7 @@ import com.google.spanner.v1.PartitionQueryRequest; import com.google.spanner.v1.PartitionReadRequest; import com.google.spanner.v1.PartitionResponse; +import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.Session; @@ -391,16 +393,18 @@ public void deleteSession(String sessionName, @Nullable Map options) } @Override - public StreamingCall read( + public ServerStream read( ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options) { GrpcCallContext context = newCallContext(options, request.getSession()); - throw new UnsupportedOperationException("not implemented yet"); + return stub.streamingReadCallable().call(request, context); } @Override - public StreamingCall executeQuery( + public ServerStream executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options) { - throw new UnsupportedOperationException("Not implemented yet."); + GrpcCallContext context = GrpcCallContext.createDefault() + .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); + return stub.executeStreamingSqlCallable().call(request, context); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java index d6b3e132e505..e91b140ef4a0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java @@ -22,6 +22,7 @@ import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.ServerStream; import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceOptions; @@ -366,25 +367,15 @@ public void deleteSession(String sessionName, @Nullable Map options) } @Override - public StreamingCall read( + public ServerStream read( ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options) { - return doStreamingCall( - SpannerGrpc.METHOD_STREAMING_READ, - request, - consumer, - request.getSession(), - Option.CHANNEL_HINT.getLong(options)); + throw new UnsupportedOperationException("Not implemented: read"); } @Override - public StreamingCall executeQuery( + public ServerStream executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options) { - return doStreamingCall( - SpannerGrpc.METHOD_EXECUTE_STREAMING_SQL, - request, - consumer, - request.getSession(), - Option.CHANNEL_HINT.getLong(options)); + throw new UnsupportedOperationException("Not implemented: executeQuery"); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 9d7066c5e55c..2c0ba3be4cc1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -16,6 +16,7 @@ package com.google.cloud.spanner.spi.v1; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.ServiceRpc; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; @@ -197,10 +198,10 @@ Session createSession(String databaseName, @Nullable Map labels, void deleteSession(String sessionName, @Nullable Map options) throws SpannerException; - StreamingCall read( + ServerStream read( ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options); - StreamingCall executeQuery( + ServerStream executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options); Transaction beginTransaction(BeginTransactionRequest request, @Nullable Map options) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ServerStreamingStashCallable.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ServerStreamingStashCallable.java new file mode 100644 index 000000000000..026d10f95cf0 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ServerStreamingStashCallable.java @@ -0,0 +1,116 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.common.base.Preconditions; +import com.google.common.collect.Queues; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CancellationException; + +public class ServerStreamingStashCallable + extends ServerStreamingCallable { + private List responseList; + + public ServerStreamingStashCallable() { + responseList = new ArrayList<>(); + } + + public ServerStreamingStashCallable(List responseList) { + this.responseList = responseList; + } + + @Override + public void call( + RequestT request, ResponseObserver responseObserver, ApiCallContext context) { + Preconditions.checkNotNull(responseObserver); + + StreamControllerStash controller = + new StreamControllerStash<>(responseList, responseObserver); + controller.start(); + } + + // Minimal implementation of back pressure aware stream controller. Not threadsafe + private static class StreamControllerStash implements StreamController { + final ResponseObserver observer; + final Queue queue; + boolean autoFlowControl = true; + long numPending; + Throwable error; + boolean delivering, closed; + + public StreamControllerStash( + List responseList, ResponseObserver observer) { + this.observer = observer; + this.queue = Queues.newArrayDeque(responseList); + } + + public void start() { + observer.onStart(this); + if (autoFlowControl) { + numPending = Integer.MAX_VALUE; + } + deliver(); + } + + @Override + public void disableAutoInboundFlowControl() { + autoFlowControl = false; + } + + @Override + public void request(int count) { + numPending += count; + deliver(); + } + + @Override + public void cancel() { + error = new CancellationException("User cancelled stream"); + deliver(); + } + + private void deliver() { + if (delivering || closed) return; + delivering = true; + + try { + while (error == null && numPending > 0 && !queue.isEmpty()) { + numPending--; + observer.onResponse(queue.poll()); + } + + if (error != null || queue.isEmpty()) { + if (error != null) { + observer.onError(error); + } else { + observer.onComplete(); + } + closed = true; + } + } finally { + delivering = false; + } + } + } +} \ No newline at end of file diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 08e1bf1e14f3..e0cee1bcb506 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -19,6 +19,8 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; +import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.cloud.Timestamp; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.protobuf.ByteString; @@ -280,18 +282,15 @@ public void request(int numMessages) {} } private void mockRead(final PartialResultSet myResultSet) { - final ArgumentCaptor consumer = - ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); - Mockito.when(rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options))) - .then( - new Answer() { - @Override - public SpannerRpc.StreamingCall answer(InvocationOnMock invocation) throws Throwable { - consumer.getValue().onPartialResultSet(myResultSet); - consumer.getValue().onCompleted(); - return new NoOpStreamingCall(); - } - }); + ServerStreamingCallable serverStreamingCallable = + new ServerStreamingStashCallable(Arrays.asList(myResultSet)); + final ServerStream mockServerStream = serverStreamingCallable.call(null); + Mockito.when( + rpc.read( + Mockito.any(), + Mockito.any(), + Mockito.eq(options))) + .thenReturn(mockServerStream); } @Test From 067a190296323e50ea84bcfbef7b02212b7f3f46 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Fri, 13 Apr 2018 16:09:05 -0700 Subject: [PATCH 2/3] Make integration tests work --- .../google/cloud/spanner/BatchClientImpl.java | 5 +-- .../com/google/cloud/spanner/SpannerImpl.java | 31 +++++++++++++++---- .../google/cloud/spanner/SpannerOptions.java | 5 +++ .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 3 +- .../cloud/spanner/BatchClientImplTest.java | 11 ++++--- 5 files changed, 40 insertions(+), 15 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index dbb1d705d32c..e07c935b0f78 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -23,6 +23,7 @@ import com.google.cloud.spanner.SpannerImpl.MultiUseReadOnlyTransaction; import com.google.cloud.spanner.SpannerImpl.SessionImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.spi.v1.GapicSpannerRpc; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.protobuf.Struct; @@ -68,7 +69,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa super( checkNotNull(session), checkNotNull(bound), - checkNotNull(spanner).getOptions().getSpannerRpcV1(), + checkNotNull(spanner).getOptions().getGapicSpannerRpc(), spanner.getOptions().getPrefetchChunks()); this.sessionName = session.getName(); this.options = session.getOptions(); @@ -81,7 +82,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa checkNotNull(session), checkNotNull(batchTransactionId).getTransactionId(), batchTransactionId.getTimestamp(), - checkNotNull(spanner).getOptions().getSpannerRpcV1(), + checkNotNull(spanner).getOptions().getGapicSpannerRpc(), spanner.getOptions().getPrefetchChunks()); this.sessionName = session.getName(); this.options = session.getOptions(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 363b2b5e9d0d..be04e796b9d1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -165,7 +165,7 @@ class SpannerImpl extends BaseService implements Spanner { SpannerImpl(SpannerOptions options) { this( options.getSpannerRpcV1(), - GapicSpannerRpc.create(options), + options.getGapicSpannerRpc(), options.getPrefetchChunks(), options); } @@ -1063,7 +1063,9 @@ CloseableIterator startStream(@Nullable ByteString resumeToken null, session.options)); - // let resume fail for now + // Let resume fail for now. Gapic has its own resume, but in order not + // to introduce too much change at a time, we decide to plumb up + // ServerStream first and then figure out how to make resume work } }; return new GrpcResultSet(stream, this, queryMode); @@ -1170,7 +1172,9 @@ CloseableIterator startStream(@Nullable ByteString resumeToken null, session.options)); - // let resume fail for now + // Let resume fail for now. Gapic has its own resume, but in order not + // to introduce too much change at a time, we decide to plumb up + // ServerStream first and then figure out how to make resume work } }; GrpcResultSet resultSet = @@ -2288,17 +2292,32 @@ public CloseableServerStreamIterator(ServerStream stream) { @Override public boolean hasNext() { - return iterator.hasNext(); + try { + return iterator.hasNext(); + } + catch (Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } } @Override public T next() { - return iterator.next(); + try { + return iterator.next(); + } + catch (Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } } @Override public void close(@Nullable String message) { - stream.cancel(); + try { + stream.cancel(); + } + catch (Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index b5ca59c1b9c1..a8990112c769 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -21,6 +21,7 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.ServiceRpc; import com.google.cloud.TransportOptions; +import com.google.cloud.spanner.spi.v1.GapicSpannerRpc; import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.SpannerRpcFactory; @@ -343,6 +344,10 @@ protected SpannerRpc getSpannerRpcV1() { return (SpannerRpc) getRpc(); } + protected SpannerRpc getGapicSpannerRpc() { + return GapicSpannerRpc.create(this); + } + @SuppressWarnings("unchecked") @Override public Builder toBuilder() { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 9c3c50418947..b000bcda7c20 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -402,8 +402,7 @@ public ServerStream read( @Override public ServerStream executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options) { - GrpcCallContext context = GrpcCallContext.createDefault() - .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); + GrpcCallContext context = newCallContext(options, request.getSession()); return stub.executeStreamingSqlCallable().call(request, context); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java index da9a39cb057d..a17537bfead0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java @@ -48,7 +48,8 @@ public final class BatchClientImplTest { private static final ByteString TXN_ID = ByteString.copyFromUtf8("my-txn"); private static final String TIMESTAMP = "2017-11-15T10:54:20Z"; - @Mock private SpannerRpc rpc; + @Mock private SpannerRpc rawGrpcRpc; + @Mock private SpannerRpc gapicRpc; @Mock private SpannerOptions spannerOptions; @Captor private ArgumentCaptor> optionsCaptor; @Mock private BatchTransactionId txnID; @@ -59,20 +60,20 @@ public final class BatchClientImplTest { public void setUp() { initMocks(this); DatabaseId db = DatabaseId.of(DB_NAME); - SpannerImpl spanner = new SpannerImpl(rpc, rpc, 1, spannerOptions); + SpannerImpl spanner = new SpannerImpl(rawGrpcRpc, gapicRpc, 1, spannerOptions); client = new BatchClientImpl(db, spanner); } @Test public void testBatchReadOnlyTxnWithBound() throws Exception { Session sessionProto = Session.newBuilder().setName(SESSION_NAME).build(); - when(rpc.createSession(eq(DB_NAME), (Map) anyMap(), optionsCaptor.capture())) + when(gapicRpc.createSession(eq(DB_NAME), (Map) anyMap(), optionsCaptor.capture())) .thenReturn(sessionProto); com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP); Transaction txnMetadata = Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build(); - when(spannerOptions.getSpannerRpcV1()).thenReturn(rpc); - when(rpc.beginTransaction(Mockito.any(), optionsCaptor.capture())) + when(spannerOptions.getGapicSpannerRpc()).thenReturn(gapicRpc); + when(gapicRpc.beginTransaction(Mockito.any(), optionsCaptor.capture())) .thenReturn(txnMetadata); BatchReadOnlyTransaction batchTxn = client.batchReadOnlyTransaction(TimestampBound.strong()); From ef50e69a743a9a6563a98f6000f22afcd7f3dda3 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Mon, 16 Apr 2018 16:26:13 -0700 Subject: [PATCH 3/3] Documentation and format --- .../com/google/cloud/spanner/SpannerImpl.java | 35 ++++++++++++------- .../spanner/ServerStreamingStashCallable.java | 5 ++- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index be04e796b9d1..5883dcb3ca6f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -1056,13 +1056,15 @@ ResultSet executeQueryInternalWithOptions( new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { - return new CloseableServerStreamIterator(rpc.executeQuery( - resumeToken == null - ? request - : request.toBuilder().setResumeToken(resumeToken).build(), - null, - session.options)); - + return new CloseableServerStreamIterator( + rpc.executeQuery( + resumeToken == null + ? request + : request.toBuilder().setResumeToken(resumeToken).build(), + null, + session.options)); + + // TODO(hzyi): make resume work // Let resume fail for now. Gapic has its own resume, but in order not // to introduce too much change at a time, we decide to plumb up // ServerStream first and then figure out how to make resume work @@ -1165,13 +1167,15 @@ ResultSet readInternalWithOptions( new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { - return new CloseableServerStreamIterator(rpc.read( - resumeToken == null - ? request - : request.toBuilder().setResumeToken(resumeToken).build(), - null, - session.options)); + return new CloseableServerStreamIterator( + rpc.read( + resumeToken == null + ? request + : request.toBuilder().setResumeToken(resumeToken).build(), + null, + session.options)); + // TODO(hzyi): make resume work // Let resume fail for now. Gapic has its own resume, but in order not // to introduce too much change at a time, we decide to plumb up // ServerStream first and then figure out how to make resume work @@ -2310,6 +2314,11 @@ public T next() { } } + @Override + public void remove() { + throw new UnsupportedOperationException("Not supported: remove."); + } + @Override public void close(@Nullable String message) { try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ServerStreamingStashCallable.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ServerStreamingStashCallable.java index 026d10f95cf0..da46231eea50 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ServerStreamingStashCallable.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ServerStreamingStashCallable.java @@ -18,7 +18,6 @@ import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ResponseObserver; -import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StreamController; import com.google.common.base.Preconditions; @@ -28,6 +27,10 @@ import java.util.Queue; import java.util.concurrent.CancellationException; +/** + * TODO(hzyi): convert this class into a general utility class + * This class is copied from gax and is used for testing ServerStream only. + */ public class ServerStreamingStashCallable extends ServerStreamingCallable { private List responseList;