Skip to content

Commit

Permalink
spanner: implement LRO methods using GAPIC stub (#3039)
Browse files Browse the repository at this point in the history
  • Loading branch information
pongad authored Mar 19, 2018
1 parent 91dd689 commit be50109
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,43 @@

package com.google.cloud.spanner.spi.v1;

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.ServiceOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.base.MoreObjects;
import com.google.longrunning.GetOperationRequest;
import com.google.longrunning.Operation;
import com.google.protobuf.FieldMask;
import com.google.spanner.admin.database.v1.CreateDatabaseRequest;
import com.google.spanner.admin.database.v1.Database;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlRequest;
import com.google.spanner.admin.instance.v1.CreateInstanceRequest;
import com.google.spanner.admin.instance.v1.Instance;
import com.google.spanner.admin.instance.v1.InstanceConfig;
import com.google.spanner.admin.instance.v1.UpdateInstanceRequest;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
Expand All @@ -37,17 +64,80 @@
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.Transaction;
import io.grpc.Context;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nullable;

/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
public class GapicSpannerRpc implements SpannerRpc {
private static final PathTemplate PROJECT_NAME_TEMPLATE =
PathTemplate.create("projects/{project}");

private final SpannerStub stub;
private final InstanceAdminStub instanceStub;
private final DatabaseAdminStub databaseStub;
private final String projectId;
private final String projectName;
private final SpannerMetadataProvider metadataProvider;

public GapicSpannerRpc(SpannerOptions options) throws IOException {
this.projectId = options.getProjectId();
this.projectName = PROJECT_NAME_TEMPLATE.instantiate("project", this.projectId);

ApiClientHeaderProvider.Builder internalHeaderProviderBuilder =
ApiClientHeaderProvider.newBuilder();
ApiClientHeaderProvider internalHeaderProvider =
internalHeaderProviderBuilder
.setClientLibToken(
ServiceOptions.getGoogApiClientLibName(),
GaxProperties.getLibraryVersion(options.getClass()))
.setTransportToken(
GaxGrpcProperties.getGrpcTokenName(), GaxGrpcProperties.getGrpcVersion())
.build();

HeaderProvider mergedHeaderProvider = options.getMergedHeaderProvider(internalHeaderProvider);
this.metadataProvider =
SpannerMetadataProvider.create(
mergedHeaderProvider.getHeaders(),
internalHeaderProviderBuilder.getResourceHeaderKey());

// TODO(pongad): make channel pool work

// TODO(pongad): make RPC logging work (formerly LoggingInterceptor)
// TODO(pongad): add watchdog
// TODO(pongad): make error augmentation work (formerly SpannerErrorInterceptor)

TransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(
GrpcTransportChannel.newBuilder()
.setManagedChannel(options.getRpcChannels().get(0))
.build());
CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);

public GapicSpannerRpc() throws IOException {
stub = GrpcSpannerStub.create(SpannerStubSettings.newBuilder().build());
this.stub =
GrpcSpannerStub.create(
SpannerStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build());
this.instanceStub =
GrpcInstanceAdminStub.create(
InstanceAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build());
this.databaseStub =
GrpcDatabaseAdminStub.create(
DatabaseAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build());
}

@Override
Expand All @@ -70,12 +160,22 @@ public Paginated<Instance> listInstances(
@Override
public Operation createInstance(String parent, String instanceId, Instance instance)
throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
CreateInstanceRequest request =
CreateInstanceRequest.newBuilder()
.setParent(parent)
.setInstanceId(instanceId)
.setInstance(instance)
.build();
// TODO: put parent in metadata
return get(instanceStub.createInstanceCallable().futureCall(request));
}

@Override
public Operation updateInstance(Instance instance, FieldMask fieldMask) throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
UpdateInstanceRequest request =
UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build();
// TODO: put instance.getName() in metadata
return get(instanceStub.updateInstanceCallable().futureCall(request));
}

@Override
Expand All @@ -97,13 +197,27 @@ public Paginated<Database> listDatabases(
@Override
public Operation createDatabase(String instanceName, String createDatabaseStatement,
Iterable<String> additionalStatements) throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
CreateDatabaseRequest request =
CreateDatabaseRequest.newBuilder()
.setParent(instanceName)
.setCreateStatement(createDatabaseStatement)
.addAllExtraStatements(additionalStatements)
.build();
// TODO: put instanceName in metadata
return get(databaseStub.createDatabaseCallable().futureCall(request));
}

@Override
public Operation updateDatabaseDdl(String databaseName, Iterable<String> updateDatabaseStatements,
@Nullable String updateId) throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
UpdateDatabaseDdlRequest request =
UpdateDatabaseDdlRequest.newBuilder()
.setDatabase(databaseName)
.addAllStatements(updateDatabaseStatements)
.setOperationId(MoreObjects.firstNonNull(updateId, ""))
.build();
// TODO: put databaseName in metadata
return get(databaseStub.updateDatabaseDdlCallable().futureCall(request));
}

@Override
Expand All @@ -123,7 +237,9 @@ public List<String> getDatabaseDdl(String databaseName) throws SpannerException

@Override
public Operation getOperation(String name) throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build();
// TODO: put name in metadata
return get(databaseStub.getOperationsStub().getOperationCallable().futureCall(request));
}

@Override
Expand Down Expand Up @@ -179,6 +295,20 @@ public PartitionResponse partitionRead(
PartitionReadRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
// TODO(pongad): Figure out metadata
// TODO(pongad): Figure out channel affinity
return stub.partitionReadCallable().call(request);
return get(stub.partitionReadCallable().futureCall(request));
}

/** Gets the result of an async RPC call, handling any exceptions encountered. */
private static <T> T get(final Future<T> future) throws SpannerException {
final Context context = Context.current();
try {
return future.get();
} catch (InterruptedException e) {
// We are the sole consumer of the future, so cancel it.
future.cancel(true);
throw SpannerExceptionFactory.propagateInterrupt(e);
} catch (ExecutionException | CancellationException e) {
throw newSpannerException(context, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ClientResponseObserver;
import io.opencensus.trace.export.SampledSpanStore;
import io.opencensus.trace.Tracing;

import io.opencensus.trace.export.SampledSpanStore;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -104,11 +103,11 @@

/** Implementation of Cloud Spanner remote calls using gRPC. */
public class GrpcSpannerRpc implements SpannerRpc {

static {
setupTracingConfig();
}

private static final Logger logger = Logger.getLogger(GrpcSpannerRpc.class.getName());

private static final PathTemplate PROJECT_NAME_TEMPLATE =
Expand Down Expand Up @@ -594,7 +593,7 @@ public void cancel(@Nullable String message) {
}
}

private static class LoggingInterceptor implements ClientInterceptor {
static class LoggingInterceptor implements ClientInterceptor {
private final Level level;

LoggingInterceptor(Level level) {
Expand Down

0 comments on commit be50109

Please sign in to comment.