Skip to content

Commit

Permalink
Make ChannelPool work (#3258)
Browse files Browse the repository at this point in the history
* Make ChannelPool work.

* Add todo for channelProvider
  • Loading branch information
yihanzhen authored May 31, 2018
1 parent f9df8a2 commit bc647b1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

package com.google.cloud.spanner;

import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.ServiceDefaults;
import com.google.cloud.ServiceOptions;
import com.google.cloud.ServiceRpc;
import com.google.cloud.TransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.spi.SpannerRpcFactory;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.SpannerRpcFactory;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -218,6 +218,10 @@ public List<ManagedChannel> getRpcChannels() {
return rpcChannels;
}

public int getNumChannels() {
return numChannels;
}

public SessionPoolOptions getSessionPoolOptions() {
return sessionPoolOptions;
}
Expand Down Expand Up @@ -353,4 +357,15 @@ protected SpannerRpc getGapicSpannerRpc() {
public Builder toBuilder() {
return new Builder(this);
}

public String getEndpoint() {
URL url;
try {
url = new URL(getHost());
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid host: " + getHost(), e);
}
return String.format(
"%s:%s", url.getHost(), url.getPort() < 0 ? url.getDefaultPort() : url.getPort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
Expand Down Expand Up @@ -106,7 +107,8 @@ public class GapicSpannerRpc implements SpannerRpc {

private static final PathTemplate PROJECT_NAME_TEMPLATE =
PathTemplate.create("projects/{project}");

private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;

private final SpannerStub stub;
private final InstanceAdminStub instanceStub;
private final DatabaseAdminStub databaseStub;
Expand Down Expand Up @@ -143,17 +145,19 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException {
mergedHeaderProvider.getHeaders(),
internalHeaderProviderBuilder.getResourceHeaderKey());

// TODO(pongad): make channel pool work

// TODO(pongad): make RPC logging work (formerly LoggingInterceptor)
// TODO(pongad): add watchdog
// TODO(pongad): make error augmentation work (formerly SpannerErrorInterceptor)

// TODO(hzyi): make this channelProvider configurable through SpannerOptions
TransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(
GrpcTransportChannel.newBuilder()
.setManagedChannel(options.getRpcChannels().get(0))
.build());
InstantiatingGrpcChannelProvider
.newBuilder()
.setEndpoint(options.getEndpoint())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setPoolSize(options.getNumChannels())
.build();

CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);

Expand Down

0 comments on commit bc647b1

Please sign in to comment.