Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spanner: migrate streaming methods to gapic #3139

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.spanner.SpannerImpl.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Struct;
Expand Down Expand Up @@ -68,7 +69,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
super(
checkNotNull(session),
checkNotNull(bound),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
Expand All @@ -81,7 +82,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
checkNotNull(session),
checkNotNull(batchTransactionId).getTransactionId(),
batchTransactionId.getTimestamp(),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.paging.Page;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.BaseService;
import com.google.cloud.ByteArray;
Expand Down Expand Up @@ -164,7 +165,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
SpannerImpl(SpannerOptions options) {
this(
options.getSpannerRpcV1(),
GapicSpannerRpc.create(options),
options.getGapicSpannerRpc(),
options.getPrefetchChunks(),
options);
}
Expand Down Expand Up @@ -828,7 +829,7 @@ public ReadContext singleUse() {

@Override
public ReadContext singleUse(TimestampBound bound) {
return setActive(new SingleReadContext(this, bound, rawGrpcRpc, defaultPrefetchChunks));
return setActive(new SingleReadContext(this, bound, gapicRpc, defaultPrefetchChunks));
}

@Override
Expand All @@ -839,7 +840,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
return setActive(
new SingleUseReadOnlyTransaction(this, bound, rawGrpcRpc, defaultPrefetchChunks));
new SingleUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks));
}

@Override
Expand All @@ -850,12 +851,12 @@ public ReadOnlyTransaction readOnlyTransaction() {
@Override
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
return setActive(
new MultiUseReadOnlyTransaction(this, bound, rawGrpcRpc, defaultPrefetchChunks));
new MultiUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks));
}

@Override
public TransactionRunner readWriteTransaction() {
return setActive(new TransactionRunnerImpl(this, rawGrpcRpc, defaultPrefetchChunks));
return setActive(new TransactionRunnerImpl(this, gapicRpc, defaultPrefetchChunks));
}

@Override
Expand Down Expand Up @@ -1055,20 +1056,18 @@ ResultSet executeQueryInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
SpannerRpc.StreamingCall call =
return new CloseableServerStreamIterator<PartialResultSet>(
rpc.executeQuery(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
stream.consumer(),
session.options);
// We get one message for free.
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
}
stream.setCall(call);
return stream;
null,
session.options));

// TODO(hzyi): make resume work
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
}
};
return new GrpcResultSet(stream, this, queryMode);
Expand Down Expand Up @@ -1168,20 +1167,18 @@ ResultSet readInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
SpannerRpc.StreamingCall call =
return new CloseableServerStreamIterator<PartialResultSet>(
rpc.read(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
stream.consumer(),
session.options);
// We get one message for free.
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
}
stream.setCall(call);
return stream;
null,
session.options));

// TODO(hzyi): make resume work
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
}
};
GrpcResultSet resultSet =
Expand Down Expand Up @@ -2287,6 +2284,52 @@ interface CloseableIterator<T> extends Iterator<T> {
void close(@Nullable String message);
}

private static final class CloseableServerStreamIterator<T> implements CloseableIterator<T> {

This comment was marked as spam.


private final ServerStream<T> stream;
private final Iterator<T> iterator;

public CloseableServerStreamIterator(ServerStream<T> stream) {
this.stream = stream;
this.iterator = stream.iterator();
}

@Override
public boolean hasNext() {
try {
return iterator.hasNext();
}
catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}

@Override
public T next() {
try {
return iterator.next();
}
catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}

@Override
public void remove() {
throw new UnsupportedOperationException("Not supported: remove.");
}

@Override
public void close(@Nullable String message) {
try {
stream.cancel();
}
catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}
}

/** Adapts a streaming read/query call into an iterator over partial result sets. */
@VisibleForTesting
static class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.ServiceRpc;
import com.google.cloud.TransportOptions;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.SpannerRpcFactory;
Expand Down Expand Up @@ -343,6 +344,10 @@ protected SpannerRpc getSpannerRpcV1() {
return (SpannerRpc) getRpc();
}

protected SpannerRpc getGapicSpannerRpc() {
return GapicSpannerRpc.create(this);
}

@SuppressWarnings("unchecked")
@Override
public Builder toBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
Expand Down Expand Up @@ -79,6 +80,7 @@
import com.google.spanner.v1.PartitionQueryRequest;
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
Expand Down Expand Up @@ -391,16 +393,17 @@ public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
}

@Override
public StreamingCall read(
public ServerStream<PartialResultSet> read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
GrpcCallContext context = newCallContext(options, request.getSession());
throw new UnsupportedOperationException("not implemented yet");
return stub.streamingReadCallable().call(request, context);
}

@Override
public StreamingCall executeQuery(
public ServerStream<PartialResultSet> executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = newCallContext(options, request.getSession());
return stub.executeStreamingSqlCallable().call(request, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceOptions;
Expand Down Expand Up @@ -366,25 +367,15 @@ public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
}

@Override
public StreamingCall read(
public ServerStream<PartialResultSet> read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
return doStreamingCall(
SpannerGrpc.METHOD_STREAMING_READ,
request,
consumer,
request.getSession(),
Option.CHANNEL_HINT.getLong(options));
throw new UnsupportedOperationException("Not implemented: read");
}

@Override
public StreamingCall executeQuery(
public ServerStream<PartialResultSet> executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
return doStreamingCall(
SpannerGrpc.METHOD_EXECUTE_STREAMING_SQL,
request,
consumer,
request.getSession(),
Option.CHANNEL_HINT.getLong(options));
throw new UnsupportedOperationException("Not implemented: executeQuery");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner.spi.v1;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
Expand Down Expand Up @@ -197,10 +198,10 @@ Session createSession(String databaseName, @Nullable Map<String, String> labels,

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

StreamingCall read(
ServerStream<PartialResultSet> read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

StreamingCall executeQuery(
ServerStream<PartialResultSet> executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Transaction beginTransaction(BeginTransactionRequest request, @Nullable Map<Option, ?> options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public final class BatchClientImplTest {
private static final ByteString TXN_ID = ByteString.copyFromUtf8("my-txn");
private static final String TIMESTAMP = "2017-11-15T10:54:20Z";

@Mock private SpannerRpc rpc;
@Mock private SpannerRpc rawGrpcRpc;
@Mock private SpannerRpc gapicRpc;
@Mock private SpannerOptions spannerOptions;
@Captor private ArgumentCaptor<Map<SpannerRpc.Option, Object>> optionsCaptor;
@Mock private BatchTransactionId txnID;
Expand All @@ -59,20 +60,20 @@ public final class BatchClientImplTest {
public void setUp() {
initMocks(this);
DatabaseId db = DatabaseId.of(DB_NAME);
SpannerImpl spanner = new SpannerImpl(rpc, rpc, 1, spannerOptions);
SpannerImpl spanner = new SpannerImpl(rawGrpcRpc, gapicRpc, 1, spannerOptions);
client = new BatchClientImpl(db, spanner);
}

@Test
public void testBatchReadOnlyTxnWithBound() throws Exception {
Session sessionProto = Session.newBuilder().setName(SESSION_NAME).build();
when(rpc.createSession(eq(DB_NAME), (Map<String, String>) anyMap(), optionsCaptor.capture()))
when(gapicRpc.createSession(eq(DB_NAME), (Map<String, String>) anyMap(), optionsCaptor.capture()))
.thenReturn(sessionProto);
com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP);
Transaction txnMetadata =
Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build();
when(spannerOptions.getSpannerRpcV1()).thenReturn(rpc);
when(rpc.beginTransaction(Mockito.<BeginTransactionRequest>any(), optionsCaptor.capture()))
when(spannerOptions.getGapicSpannerRpc()).thenReturn(gapicRpc);
when(gapicRpc.beginTransaction(Mockito.<BeginTransactionRequest>any(), optionsCaptor.capture()))
.thenReturn(txnMetadata);

BatchReadOnlyTransaction batchTxn = client.batchReadOnlyTransaction(TimestampBound.strong());
Expand Down
Loading