Skip to content

Commit

Permalink
feat(bigtable): support partial results in InstanceAdminClient.Cluste…
Browse files Browse the repository at this point in the history
…rs() (#2932)

Co-authored-by: Chris Cotter <[email protected]>
Co-authored-by: Christopher Wilcox <[email protected]>
  • Loading branch information
3 people authored Feb 8, 2021
1 parent 0b4370a commit 28decb5
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 2 deletions.
11 changes: 9 additions & 2 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,10 @@ func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, c
return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil)
}

// Clusters lists the clusters in an instance.
// Clusters lists the clusters in an instance. If any location
// (cluster) is unavailable due to some transient conditions, Clusters
// returns partial results and ErrPartiallyUnavailable error with
// unavailable locations list.
func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string) ([]*ClusterInfo, error) {
ctx = mergeOutgoingMetadata(ctx, iac.md)
req := &btapb.ListClustersRequest{Parent: "projects/" + iac.project + "/instances/" + instanceID}
Expand All @@ -1032,7 +1035,6 @@ func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string)
if err != nil {
return nil, err
}
// TODO(garyelliott): Deal with failed_locations.
var cis []*ClusterInfo
for _, c := range res.Clusters {
nameParts := strings.Split(c.Name, "/")
Expand All @@ -1045,6 +1047,11 @@ func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string)
StorageType: storageTypeFromProto(c.DefaultStorageType),
})
}
if len(res.FailedLocations) > 0 {
// Return partial results and an error in
// case of some locations are unavailable.
return cis, ErrPartiallyUnavailable{res.FailedLocations}
}
return cis, nil
}

Expand Down
117 changes: 117 additions & 0 deletions bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ import (
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
v1 "google.golang.org/genproto/googleapis/iam/v1"
longrunning "google.golang.org/genproto/googleapis/longrunning"
grpc "google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)

const (
Expand Down Expand Up @@ -1809,6 +1813,119 @@ func TestIntegration_AdminSnapshot(t *testing.T) {
}
}

// instanceAdminClientMock is used to test FailedLocations field processing.
type instanceAdminClientMock struct {
Clusters []*btapb.Cluster
UnavailableLocations []string
}

func (iacm *instanceAdminClientMock) ListClusters(ctx context.Context, req *btapb.ListClustersRequest, opts ...grpc.CallOption) (*btapb.ListClustersResponse, error) {
res := btapb.ListClustersResponse{
Clusters: iacm.Clusters,
FailedLocations: iacm.UnavailableLocations,
}
return &res, nil
}

func (iacm *instanceAdminClientMock) CreateInstance(ctx context.Context, in *btapb.CreateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) GetInstance(ctx context.Context, in *btapb.GetInstanceRequest, opts ...grpc.CallOption) (*btapb.Instance, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) ListInstances(ctx context.Context, in *btapb.ListInstancesRequest, opts ...grpc.CallOption) (*btapb.ListInstancesResponse, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) UpdateInstance(ctx context.Context, in *btapb.Instance, opts ...grpc.CallOption) (*btapb.Instance, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) PartialUpdateInstance(ctx context.Context, in *btapb.PartialUpdateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) DeleteInstance(ctx context.Context, in *btapb.DeleteInstanceRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) CreateCluster(ctx context.Context, in *btapb.CreateClusterRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) GetCluster(ctx context.Context, in *btapb.GetClusterRequest, opts ...grpc.CallOption) (*btapb.Cluster, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) UpdateCluster(ctx context.Context, in *btapb.Cluster, opts ...grpc.CallOption) (*longrunning.Operation, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) DeleteCluster(ctx context.Context, in *btapb.DeleteClusterRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) CreateAppProfile(ctx context.Context, in *btapb.CreateAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) GetAppProfile(ctx context.Context, in *btapb.GetAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) ListAppProfiles(ctx context.Context, in *btapb.ListAppProfilesRequest, opts ...grpc.CallOption) (*btapb.ListAppProfilesResponse, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) UpdateAppProfile(ctx context.Context, in *btapb.UpdateAppProfileRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) DeleteAppProfile(ctx context.Context, in *btapb.DeleteAppProfileRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) GetIamPolicy(ctx context.Context, in *v1.GetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) SetIamPolicy(ctx context.Context, in *v1.SetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) {
return nil, nil
}
func (iacm *instanceAdminClientMock) TestIamPermissions(ctx context.Context, in *v1.TestIamPermissionsRequest, opts ...grpc.CallOption) (*v1.TestIamPermissionsResponse, error) {
return nil, nil
}

func TestIntegration_InstanceAdminClient_Clusters_WithFailedLocations(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()

if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support snapshots")
}

iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer iAdminClient.Close()

cluster1 := btapb.Cluster{Name: "cluster1"}
failedLoc := "euro1"

iAdminClient.iClient = &instanceAdminClientMock{
Clusters: []*btapb.Cluster{&cluster1},
UnavailableLocations: []string{failedLoc},
}

cis, err := iAdminClient.Clusters(context.Background(), "instance-id")
convertedErr, ok := err.(ErrPartiallyUnavailable)
if !ok {
t.Fatalf("want error ErrPartiallyUnavailable, got other")
}
if got, want := len(convertedErr.Locations), 1; got != want {
t.Fatalf("want %v failed locations, got %v", want, got)
}
if got, want := convertedErr.Locations[0], failedLoc; got != want {
t.Fatalf("want failed location %v, got %v", want, got)
}
if got, want := len(cis), 1; got != want {
t.Fatalf("want %v failed locations, got %v", want, got)
}
if got, want := cis[0].Name, cluster1.Name; got != want {
t.Fatalf("want cluster %v, got %v", want, got)
}
}

func TestIntegration_Granularity(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
Expand Down

0 comments on commit 28decb5

Please sign in to comment.