diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index e37841b2435b..03b79d722e40 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -16,16 +16,43 @@ package com.google.cloud.spanner.spi.v1; +import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.GaxProperties; +import com.google.api.gax.grpc.GaxGrpcProperties; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.pathtemplate.PathTemplate; +import com.google.cloud.ServiceOptions; +import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub; +import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; +import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub; +import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub; +import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub; +import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings; import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; import com.google.cloud.spanner.v1.stub.GrpcSpannerStub; import com.google.cloud.spanner.v1.stub.SpannerStub; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; +import com.google.common.base.MoreObjects; +import com.google.longrunning.GetOperationRequest; import com.google.longrunning.Operation; import com.google.protobuf.FieldMask; +import com.google.spanner.admin.database.v1.CreateDatabaseRequest; import com.google.spanner.admin.database.v1.Database; +import com.google.spanner.admin.database.v1.UpdateDatabaseDdlRequest; +import com.google.spanner.admin.instance.v1.CreateInstanceRequest; import com.google.spanner.admin.instance.v1.Instance; import com.google.spanner.admin.instance.v1.InstanceConfig; +import com.google.spanner.admin.instance.v1.UpdateInstanceRequest; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; @@ -37,17 +64,80 @@ import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.Session; import com.google.spanner.v1.Transaction; +import io.grpc.Context; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import javax.annotation.Nullable; /** Implementation of Cloud Spanner remote calls using Gapic libraries. */ public class GapicSpannerRpc implements SpannerRpc { + private static final PathTemplate PROJECT_NAME_TEMPLATE = + PathTemplate.create("projects/{project}"); + private final SpannerStub stub; + private final InstanceAdminStub instanceStub; + private final DatabaseAdminStub databaseStub; + private final String projectId; + private final String projectName; + private final SpannerMetadataProvider metadataProvider; + + public GapicSpannerRpc(SpannerOptions options) throws IOException { + this.projectId = options.getProjectId(); + this.projectName = PROJECT_NAME_TEMPLATE.instantiate("project", this.projectId); + + ApiClientHeaderProvider.Builder internalHeaderProviderBuilder = + ApiClientHeaderProvider.newBuilder(); + ApiClientHeaderProvider internalHeaderProvider = + internalHeaderProviderBuilder + .setClientLibToken( + ServiceOptions.getGoogApiClientLibName(), + GaxProperties.getLibraryVersion(options.getClass())) + .setTransportToken( + GaxGrpcProperties.getGrpcTokenName(), GaxGrpcProperties.getGrpcVersion()) + .build(); + + HeaderProvider mergedHeaderProvider = options.getMergedHeaderProvider(internalHeaderProvider); + this.metadataProvider = + SpannerMetadataProvider.create( + mergedHeaderProvider.getHeaders(), + internalHeaderProviderBuilder.getResourceHeaderKey()); + + // TODO(pongad): make channel pool work + + // TODO(pongad): make RPC logging work (formerly LoggingInterceptor) + // TODO(pongad): add watchdog + // TODO(pongad): make error augmentation work (formerly SpannerErrorInterceptor) + + TransportChannelProvider channelProvider = + FixedTransportChannelProvider.create( + GrpcTransportChannel.newBuilder() + .setManagedChannel(options.getRpcChannels().get(0)) + .build()); + CredentialsProvider credentialsProvider = + GrpcTransportOptions.setUpCredentialsProvider(options); - public GapicSpannerRpc() throws IOException { - stub = GrpcSpannerStub.create(SpannerStubSettings.newBuilder().build()); + this.stub = + GrpcSpannerStub.create( + SpannerStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build()); + this.instanceStub = + GrpcInstanceAdminStub.create( + InstanceAdminStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build()); + this.databaseStub = + GrpcDatabaseAdminStub.create( + DatabaseAdminStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build()); } @Override @@ -70,12 +160,22 @@ public Paginated listInstances( @Override public Operation createInstance(String parent, String instanceId, Instance instance) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + CreateInstanceRequest request = + CreateInstanceRequest.newBuilder() + .setParent(parent) + .setInstanceId(instanceId) + .setInstance(instance) + .build(); + // TODO: put parent in metadata + return get(instanceStub.createInstanceCallable().futureCall(request)); } @Override public Operation updateInstance(Instance instance, FieldMask fieldMask) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + UpdateInstanceRequest request = + UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build(); + // TODO: put instance.getName() in metadata + return get(instanceStub.updateInstanceCallable().futureCall(request)); } @Override @@ -97,13 +197,27 @@ public Paginated listDatabases( @Override public Operation createDatabase(String instanceName, String createDatabaseStatement, Iterable additionalStatements) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + CreateDatabaseRequest request = + CreateDatabaseRequest.newBuilder() + .setParent(instanceName) + .setCreateStatement(createDatabaseStatement) + .addAllExtraStatements(additionalStatements) + .build(); + // TODO: put instanceName in metadata + return get(databaseStub.createDatabaseCallable().futureCall(request)); } @Override public Operation updateDatabaseDdl(String databaseName, Iterable updateDatabaseStatements, @Nullable String updateId) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + UpdateDatabaseDdlRequest request = + UpdateDatabaseDdlRequest.newBuilder() + .setDatabase(databaseName) + .addAllStatements(updateDatabaseStatements) + .setOperationId(MoreObjects.firstNonNull(updateId, "")) + .build(); + // TODO: put databaseName in metadata + return get(databaseStub.updateDatabaseDdlCallable().futureCall(request)); } @Override @@ -123,7 +237,9 @@ public List getDatabaseDdl(String databaseName) throws SpannerException @Override public Operation getOperation(String name) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build(); + // TODO: put name in metadata + return get(databaseStub.getOperationsStub().getOperationCallable().futureCall(request)); } @Override @@ -179,6 +295,20 @@ public PartitionResponse partitionRead( PartitionReadRequest request, @Nullable Map options) throws SpannerException { // TODO(pongad): Figure out metadata // TODO(pongad): Figure out channel affinity - return stub.partitionReadCallable().call(request); + return get(stub.partitionReadCallable().futureCall(request)); + } + + /** Gets the result of an async RPC call, handling any exceptions encountered. */ + private static T get(final Future future) throws SpannerException { + final Context context = Context.current(); + try { + return future.get(); + } catch (InterruptedException e) { + // We are the sole consumer of the future, so cancel it. + future.cancel(true); + throw SpannerExceptionFactory.propagateInterrupt(e); + } catch (ExecutionException | CancellationException e) { + throw newSpannerException(context, e); + } } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java index f0a7878d661f..d6b3e132e505 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java @@ -88,9 +88,8 @@ import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientCalls; import io.grpc.stub.ClientResponseObserver; -import io.opencensus.trace.export.SampledSpanStore; import io.opencensus.trace.Tracing; - +import io.opencensus.trace.export.SampledSpanStore; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -104,11 +103,11 @@ /** Implementation of Cloud Spanner remote calls using gRPC. */ public class GrpcSpannerRpc implements SpannerRpc { - + static { setupTracingConfig(); } - + private static final Logger logger = Logger.getLogger(GrpcSpannerRpc.class.getName()); private static final PathTemplate PROJECT_NAME_TEMPLATE = @@ -594,7 +593,7 @@ public void cancel(@Nullable String message) { } } - private static class LoggingInterceptor implements ClientInterceptor { + static class LoggingInterceptor implements ClientInterceptor { private final Level level; LoggingInterceptor(Level level) {