Skip to content

Commit

Permalink
Clean up for Spanner before merging to master
Browse files Browse the repository at this point in the history
- Add TransportChannelProvider and GrpcInterceptorProvider in
  SpannerOperations GapicSpannerRpc can be configured through this
- Exposes SpannerInterceptorProvider for testing
- Make SpannerInterceptorProvider configurable
- Remove GrpcSpannerRpc and RpcChannelFactory
- Make streaming calls honor preferedChunks through StreamController
  and ResponseObserver
  • Loading branch information
yihanzhen committed Jun 8, 2018
1 parent 0de6267 commit 128c75b
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 916 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
super(
checkNotNull(session),
checkNotNull(bound),
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
Expand All @@ -82,7 +82,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
checkNotNull(session),
checkNotNull(batchTransactionId).getTransactionId(),
batchTransactionId.getTimestamp(),
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
}

private final Random random = new Random();
private final SpannerRpc rawGrpcRpc;
private final SpannerRpc gapicRpc;
private final int defaultPrefetchChunks;

Expand All @@ -153,12 +152,10 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
private boolean spannerIsClosed = false;

SpannerImpl(
SpannerRpc rawGrpcRpc,
SpannerRpc gapicRpc,
int defaultPrefetchChunks,
SpannerOptions options) {
super(options);
this.rawGrpcRpc = rawGrpcRpc;
this.gapicRpc = gapicRpc;
this.defaultPrefetchChunks = defaultPrefetchChunks;
this.dbAdminClient = new DatabaseAdminClientImpl(options.getProjectId(), gapicRpc);
Expand All @@ -169,7 +166,6 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
SpannerImpl(SpannerOptions options) {
this(
options.getSpannerRpcV1(),
options.getGapicSpannerRpc(),
options.getPrefetchChunks(),
options);
}
Expand Down Expand Up @@ -336,12 +332,10 @@ public void close() {
} catch (InterruptedException | ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
for (ManagedChannel channel : getOptions().getRpcChannels()) {
try {
channel.shutdown();
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Failed to close channel", e);
}
try {
gapicRpc.shutdown();
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Failed to close channel", e);
}
}

Expand Down Expand Up @@ -1064,21 +1058,24 @@ ResultSet executeQueryInternalWithOptions(
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) {
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
return new CloseableServerStreamIterator<PartialResultSet>(
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
SpannerRpc.StreamingCall call =
rpc.executeQuery(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
null,
session.options));

// TODO(hzyi): make resume work
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
stream.consumer(),
session.options);
// StreamController does not auto-request 1 message. Kick it off mannually
call.request(1);
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
}
stream.setCall(call);
return stream;
}
};
return new GrpcResultSet(stream, this, queryMode);
Expand Down Expand Up @@ -1178,18 +1175,21 @@ ResultSet readInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
return new CloseableServerStreamIterator<PartialResultSet>(
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
SpannerRpc.StreamingCall call =
rpc.read(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
null,
session.options));

// TODO(hzyi): make resume work
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
stream.consumer(),
session.options);
// StreamController does not auto-request 1 message. Kick it off mannually
call.request(1);
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
}
stream.setCall(call);
return stream;
}
};
GrpcResultSet resultSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package com.google.cloud.spanner;

import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.ServiceDefaults;
import com.google.cloud.ServiceOptions;
import com.google.cloud.ServiceRpc;
import com.google.cloud.TransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.spi.SpannerRpcFactory;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -53,7 +55,8 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
"https://www.googleapis.com/auth/spanner.admin",
"https://www.googleapis.com/auth/spanner.data");
private static final int MAX_CHANNELS = 256;
private static final RpcChannelFactory DEFAULT_RPC_CHANNEL_FACTORY = new NettyRpcChannelFactory();
private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
private static final int MAX_HEADER_LIST_SIZE = 32 * 1024; //bytes

/** Default implementation of {@code SpannerFactory}. */
private static class DefaultSpannerFactory implements SpannerFactory {
Expand All @@ -71,29 +74,28 @@ private static class DefaultSpannerRpcFactory implements SpannerRpcFactory {

@Override
public ServiceRpc create(SpannerOptions options) {
return new GrpcSpannerRpc(options);
return new GapicSpannerRpc(options);
}
}

private final List<ManagedChannel> rpcChannels;
private final TransportChannelProvider channelProvider;
private final GrpcInterceptorProvider interceptorProvider;
private final SessionPoolOptions sessionPoolOptions;
private final int prefetchChunks;
private final int numChannels;
private final ImmutableMap<String, String> sessionLabels;

private SpannerOptions(Builder builder) {
super(SpannerFactory.class, SpannerRpcFactory.class, builder, new SpannerDefaults());
numChannels = builder.numChannels;
String userAgent = getUserAgent();
RpcChannelFactory defaultRpcChannelFactory =
userAgent == null
? DEFAULT_RPC_CHANNEL_FACTORY
: new NettyRpcChannelFactory(userAgent);
rpcChannels =
createChannels(
getHost(),
MoreObjects.firstNonNull(builder.rpcChannelFactory, defaultRpcChannelFactory),
numChannels);
numChannels = builder.numChannels;
Preconditions.checkArgument(
numChannels >= 1 && numChannels <= MAX_CHANNELS,
"Number of channels must fall in the range [1, %s], found: %s",
MAX_CHANNELS,
numChannels);

channelProvider = builder.channelProvider;
interceptorProvider = builder.interceptorProvider;
sessionPoolOptions =
builder.sessionPoolOptions != null
? builder.sessionPoolOptions
Expand All @@ -107,10 +109,11 @@ public static class Builder
extends ServiceOptions.Builder<
Spanner, SpannerOptions, SpannerOptions.Builder> {
private static final int DEFAULT_PREFETCH_CHUNKS = 4;
private RpcChannelFactory rpcChannelFactory;
private TransportChannelProvider channelProvider;
private GrpcInterceptorProvider interceptorProvider;

/** By default, we create 4 channels per {@link SpannerOptions} */
private int numChannels = 4;

private int prefetchChunks = DEFAULT_PREFETCH_CHUNKS;
private SessionPoolOptions sessionPoolOptions;
private ImmutableMap<String, String> sessionLabels;
Expand All @@ -123,6 +126,8 @@ private Builder() {}
this.sessionPoolOptions = options.sessionPoolOptions;
this.prefetchChunks = options.prefetchChunks;
this.sessionLabels = options.sessionLabels;
this.channelProvider = options.channelProvider;
this.interceptorProvider = options.interceptorProvider;
}

@Override
Expand All @@ -134,9 +139,21 @@ public Builder setTransportOptions(TransportOptions transportOptions) {
return super.setTransportOptions(transportOptions);
}

/** Sets the factory for creating gRPC channels. If not set, a default will be used. */
public Builder setRpcChannelFactory(RpcChannelFactory factory) {
this.rpcChannelFactory = factory;
/**
* Sets the {@code ChannelProvider}. {@link GapicSpannerRpc} would create a default
* one if none is provided.
*/
public Builder setChannelProvider(TransportChannelProvider channelProvider) {
this.channelProvider = channelProvider;
return this;
}

/**
* Sets the {@code GrpcInterceptorProvider}. {@link GapicSpannerRpc} would create
* a default one if none is provided.
*/
public Builder setInterceptorProvider(GrpcInterceptorProvider interceptorProvider) {
this.interceptorProvider = interceptorProvider;
return this;
}

Expand Down Expand Up @@ -197,14 +214,6 @@ public SpannerOptions build() {
}
}

/**
* Interface for gRPC channel creation. Most users won't need to use this, as the default covers
* typical deployment scenarios.
*/
public interface RpcChannelFactory {
ManagedChannel newChannel(String host, int port);
}

/** Returns default instance of {@code SpannerOptions}. */
public static SpannerOptions getDefaultInstance() {
return newBuilder().build();
Expand All @@ -214,8 +223,12 @@ public static Builder newBuilder() {
return new Builder();
}

public List<ManagedChannel> getRpcChannels() {
return rpcChannels;
public TransportChannelProvider getChannelProvider() {
return channelProvider;
}

public GrpcInterceptorProvider getInterceptorProvider() {
return interceptorProvider;
}

public int getNumChannels() {
Expand All @@ -238,88 +251,11 @@ public static GrpcTransportOptions getDefaultGrpcTransportOptions() {
return GrpcTransportOptions.newBuilder().build();
}

/**
* Returns the default RPC channel factory used when none is specified. This may be useful for
* callers that wish to add interceptors to gRPC channels used by the Cloud Spanner client
* library.
*/
public static RpcChannelFactory getDefaultRpcChannelFactory() {
return DEFAULT_RPC_CHANNEL_FACTORY;
}

@Override
protected String getDefaultHost() {
return DEFAULT_HOST;
}

private static List<ManagedChannel> createChannels(
String rootUrl, RpcChannelFactory factory, int numChannels) {
Preconditions.checkArgument(
numChannels >= 1 && numChannels <= MAX_CHANNELS,
"Number of channels must fall in the range [1, %s], found: %s",
MAX_CHANNELS,
numChannels);
ImmutableList.Builder<ManagedChannel> builder = ImmutableList.builder();
for (int i = 0; i < numChannels; i++) {
builder.add(createChannel(rootUrl, factory));
}
return builder.build();
}

private static ManagedChannel createChannel(String rootUrl, RpcChannelFactory factory) {
URL url;
try {
url = new URL(rootUrl);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid host: " + rootUrl, e);
}
ManagedChannel channel =
factory.newChannel(url.getHost(), url.getPort() > 0 ? url.getPort() : url.getDefaultPort());
return channel;
}

static class NettyRpcChannelFactory implements RpcChannelFactory {
private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
private static final int MAX_HEADER_LIST_SIZE = 32 * 1024; //bytes
private final String userAgent;
private final List<ClientInterceptor> interceptors;

NettyRpcChannelFactory() {
this(null);
}

NettyRpcChannelFactory(String userAgent) {
this(userAgent, ImmutableList.<ClientInterceptor>of());
}

NettyRpcChannelFactory(String userAgent, List<ClientInterceptor> interceptors) {
this.userAgent = userAgent;
this.interceptors = interceptors;
}

@Override
public ManagedChannel newChannel(String host, int port) {
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(host, port)
.sslContext(newSslContext())
.intercept(interceptors)
.maxHeaderListSize(MAX_HEADER_LIST_SIZE)
.maxMessageSize(MAX_MESSAGE_SIZE);
if (userAgent != null) {
builder.userAgent(userAgent);
}
return builder.build();
}

private static SslContext newSslContext() {
try {
return GrpcSslContexts.forClient().ciphers(null).build();
} catch (SSLException e) {
throw new RuntimeException("SSL configuration failed: " + e.getMessage(), e);
}
}
}

private static class SpannerDefaults implements
ServiceDefaults<Spanner, SpannerOptions> {

Expand Down Expand Up @@ -348,10 +284,6 @@ protected SpannerRpc getSpannerRpcV1() {
return (SpannerRpc) getRpc();
}

protected SpannerRpc getGapicSpannerRpc() {
return GapicSpannerRpc.create(this);
}

@SuppressWarnings("unchecked")
@Override
public Builder toBuilder() {
Expand Down
Loading

0 comments on commit 128c75b

Please sign in to comment.