Skip to content

Commit

Permalink
feat(datastore): Multi DB support (#8276)
Browse files Browse the repository at this point in the history
* feat(datastore): Multiple databases support

* feat(datastore): Multi DB support

* feat(datastore): Multi DB support

* feat(datastore): Correcting documentation

* feat(datastore): Run integration tests on DBs
  • Loading branch information
bhshkh authored Jul 26, 2023
1 parent e414e02 commit e4d07a0
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 31 deletions.
10 changes: 8 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ variables:
bamboo-shift-455) for the general project.
- `GCLOUD_TESTS_GOLANG_KEY`: The path to the JSON key file of the general
project's service account.
- `GCLOUD_TESTS_GOLANG_DATASTORE_DATABASES`: Comma separated list of developer's Datastore databases. If not provided, default database i.e. empty string is used.
- `GCLOUD_TESTS_GOLANG_FIRESTORE_PROJECT_ID`: Developers Console project's ID
(e.g. doorway-cliff-677) for the Firestore project.
- `GCLOUD_TESTS_GOLANG_FIRESTORE_KEY`: The path to the JSON key file of the
Expand Down Expand Up @@ -153,8 +154,9 @@ $ gcloud config set project $GCLOUD_TESTS_GOLANG_PROJECT_ID
# Authenticates the gcloud tool with your account.
$ gcloud auth login

# Create the indexes used in the datastore integration tests.
$ gcloud datastore indexes create datastore/testdata/index.yaml
# Create the indexes for all the databases you want to use in the datastore integration tests.
# Use empty string as databaseID or skip database flag for default database.
$ gcloud alpha datastore indexes create --database=your-databaseID-1 --project=cndb-sdk-golang-general testdata/index.yaml

# Creates a Google Cloud storage bucket with the same name as your test project,
# and with the Cloud Logging service account as owner, for the sink
Expand Down Expand Up @@ -219,6 +221,10 @@ export GCLOUD_TESTS_GOLANG_PROJECT_ID=your-project
# The path to the JSON key file of the general project's service account.
export GCLOUD_TESTS_GOLANG_KEY=~/directory/your-project-abcd1234.json

# Comma separated list of developer's Datastore databases. If not provided,
# default database i.e. empty string is used.
export GCLOUD_TESTS_GOLANG_DATASTORE_DATABASES=your-database-1,your-database-2

# Developers Console project's ID (e.g. doorway-cliff-677) for the Firestore project.
export GCLOUD_TESTS_GOLANG_FIRESTORE_PROJECT_ID=your-firestore-project

Expand Down
8 changes: 6 additions & 2 deletions datastore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ type datastoreClient struct {
md metadata.MD
}

func newDatastoreClient(conn grpc.ClientConnInterface, projectID string) pb.DatastoreClient {
func newDatastoreClient(conn grpc.ClientConnInterface, projectID, databaseID string) pb.DatastoreClient {
resourcePrefixValue := "projects/" + projectID
if databaseID != "" {
resourcePrefixValue += "/databases/" + databaseID
}
return &datastoreClient{
c: pb.NewDatastoreClient(conn),
md: metadata.Pairs(
resourcePrefixHeader, "projects/"+projectID,
resourcePrefixHeader, resourcePrefixValue,
"x-goog-api-client", fmt.Sprintf("gl-go/%s gccl/%s grpc/", version.Go(), internal.Version)),
}
}
Expand Down
55 changes: 42 additions & 13 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,20 @@ const DetectProjectID = "*detect-project-id*"
// the resource being operated on.
const resourcePrefixHeader = "google-cloud-resource-prefix"

// DefaultDatabaseID is ID of the default database denoted by an empty string
const DefaultDatabaseID = ""

var (
gtransportDialPoolFn = gtransport.DialPool
detectProjectIDFn = detectProjectID
)

// Client is a client for reading and writing data in a datastore dataset.
type Client struct {
connPool gtransport.ConnPool
client pb.DatastoreClient
dataset string // Called dataset by the datastore API, synonym for project ID.
databaseID string // Default value is empty string
readSettings *readSettings
}

Expand All @@ -69,6 +78,21 @@ type Client struct {
// NewClient to detect the project ID from the credentials.
// Call (*Client).Close() when done with the client.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
client, err := NewClientWithDatabase(ctx, projectID, DefaultDatabaseID, opts...)
if err != nil {
return nil, err
}
return client, nil
}

// NewClientWithDatabase creates a new Client for given dataset and database.
// If the project ID is empty, it is derived from the DATASTORE_PROJECT_ID environment variable.
// If the DATASTORE_EMULATOR_HOST environment variable is set, client will use
// its value to connect to a locally-running datastore emulator.
// DetectProjectID can be passed as the projectID argument to instruct
// NewClientWithDatabase to detect the project ID from the credentials.
// Call (*Client).Close() when done with the client.
func NewClientWithDatabase(ctx context.Context, projectID, databaseID string, opts ...option.ClientOption) (*Client, error) {
var o []option.ClientOption
// Environment variables for gcd emulator:
// https://cloud.google.com/datastore/docs/tools/datastore-emulator
Expand All @@ -80,7 +104,7 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
option.WithGRPCDialOption(grpc.WithInsecure()),
}
if projectID == DetectProjectID {
projectID, _ = detectProjectID(ctx, opts...)
projectID, _ = detectProjectIDFn(ctx, opts...)
if projectID == "" {
projectID = "dummy-emulator-datastore-project"
}
Expand All @@ -106,7 +130,7 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
o = append(o, opts...)

if projectID == DetectProjectID {
detected, err := detectProjectID(ctx, opts...)
detected, err := detectProjectIDFn(ctx, opts...)
if err != nil {
return nil, err
}
Expand All @@ -116,15 +140,16 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
if projectID == "" {
return nil, errors.New("datastore: missing project/dataset id")
}
connPool, err := gtransport.DialPool(ctx, o...)
connPool, err := gtransportDialPoolFn(ctx, o...)
if err != nil {
return nil, fmt.Errorf("dialing: %w", err)
}
return &Client{
connPool: connPool,
client: newDatastoreClient(connPool, projectID),
client: newDatastoreClient(connPool, projectID, databaseID),
dataset: projectID,
readSettings: &readSettings{},
databaseID: databaseID,
}, nil
}

Expand Down Expand Up @@ -468,6 +493,7 @@ func (c *Client) get(ctx context.Context, keys []*Key, dst interface{}, opts *pb
}
req := &pb.LookupRequest{
ProjectId: c.dataset,
DatabaseId: c.databaseID,
Keys: pbKeys,
ReadOptions: opts,
}
Expand Down Expand Up @@ -565,9 +591,10 @@ func (c *Client) PutMulti(ctx context.Context, keys []*Key, src interface{}) (re

// Make the request.
req := &pb.CommitRequest{
ProjectId: c.dataset,
Mutations: mutations,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
ProjectId: c.dataset,
DatabaseId: c.databaseID,
Mutations: mutations,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
}
resp, err := c.client.Commit(ctx, req)
if err != nil {
Expand Down Expand Up @@ -688,9 +715,10 @@ func (c *Client) DeleteMulti(ctx context.Context, keys []*Key) (err error) {
}

req := &pb.CommitRequest{
ProjectId: c.dataset,
Mutations: mutations,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
ProjectId: c.dataset,
DatabaseId: c.databaseID,
Mutations: mutations,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
}
_, err = c.client.Commit(ctx, req)
return err
Expand Down Expand Up @@ -740,9 +768,10 @@ func (c *Client) Mutate(ctx context.Context, muts ...*Mutation) (ret []*Key, err
return nil, err
}
req := &pb.CommitRequest{
ProjectId: c.dataset,
Mutations: pmuts,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
ProjectId: c.dataset,
DatabaseId: c.databaseID,
Mutations: pmuts,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
}
resp, err := c.client.Commit(ctx, req)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,65 @@ import (

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
"google.golang.org/api/option"
"google.golang.org/api/transport/grpc"
pb "google.golang.org/genproto/googleapis/datastore/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestNewClientWithDatabase(t *testing.T) {
origDetectProjectIDFn := detectProjectIDFn
origGtransportDialPoolFn := gtransportDialPoolFn
defer func() {
detectProjectIDFn = origDetectProjectIDFn
gtransportDialPoolFn = origGtransportDialPoolFn
}()

tests := []struct {
desc string
databaseID string
wantErr bool
detectProjectIDFn func(ctx context.Context, opts ...option.ClientOption) (string, error)
gtransportDialPoolFn func(ctx context.Context, opts ...option.ClientOption) (grpc.ConnPool, error)
}{
{
desc: "Error from detect project ID should not fail NewClientWithDatabase",
databaseID: "db1",
wantErr: false,
detectProjectIDFn: func(ctx context.Context, opts ...option.ClientOption) (string, error) {
return "", errors.New("mock error from detect project ID")
},
gtransportDialPoolFn: origGtransportDialPoolFn,
},
{
desc: "Error from DialPool",
databaseID: "db1",
wantErr: true,
detectProjectIDFn: origDetectProjectIDFn,
gtransportDialPoolFn: func(ctx context.Context, opts ...option.ClientOption) (grpc.ConnPool, error) {
return nil, errors.New("mock error from DialPool")
},
},
}

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
detectProjectIDFn = tc.detectProjectIDFn
gtransportDialPoolFn = tc.gtransportDialPoolFn

client, gotErr := NewClientWithDatabase(context.Background(), "my-project", tc.databaseID)
if gotErr != nil && !tc.wantErr {
t.Errorf("got error %v, but none was expected", gotErr)
} else if gotErr == nil && tc.wantErr {
t.Errorf("wanted error, but none returned")
} else if gotErr == nil && client.databaseID != tc.databaseID {
t.Errorf("got %s, want %s", client.databaseID, tc.databaseID)
}
})
}
}

func TestQueryConstruction(t *testing.T) {
tests := []struct {
q, exp *Query
Expand Down
54 changes: 45 additions & 9 deletions datastore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ var timeNow = time.Now()
// when the tests are run in parallel.
var suffix string

const replayFilename = "datastore.replay"
const (
replayFilename = "datastore.replay"
envDatabases = "GCLOUD_TESTS_GOLANG_DATASTORE_DATABASES"
)

type replayInfo struct {
ProjectID string
Expand All @@ -62,6 +65,7 @@ var (
newTestClient = func(ctx context.Context, t *testing.T) *Client {
return newClient(ctx, t, nil)
}
testParams map[string]interface{}
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -103,7 +107,25 @@ func testMain(m *testing.M) int {
log.Printf("recording to %s", replayFilename)
}
suffix = fmt.Sprintf("-t%d", timeNow.UnixNano())
return m.Run()

// Run tests on multiple databases
databaseIDs := []string{DefaultDatabaseID}
databasesStr, ok := os.LookupEnv(envDatabases)
if ok {
databaseIDs = append(databaseIDs, strings.Split(databasesStr, ",")...)
}

testParams = make(map[string]interface{})
for _, databaseID := range databaseIDs {
log.Printf("Setting up tests to run on databaseID: %q\n", databaseID)
testParams["databaseID"] = databaseID
status := m.Run()
if status != 0 {
return status
}
}

return 0
}

func initReplay() {
Expand Down Expand Up @@ -133,9 +155,9 @@ func initReplay() {
}

opts := append(grpcHeadersEnforcer.CallOptions(), option.WithGRPCConn(conn))
client, err := NewClient(ctx, ri.ProjectID, opts...)
client, err := NewClientWithDatabase(ctx, ri.ProjectID, testParams["databaseID"].(string), opts...)
if err != nil {
t.Fatalf("NewClient: %v", err)
t.Fatalf("NewClientWithDatabase: %v", err)
}
return client
}
Expand All @@ -161,13 +183,27 @@ func newClient(ctx context.Context, t *testing.T, dialOpts []grpc.DialOption) *C
for _, opt := range dialOpts {
opts = append(opts, option.WithGRPCDialOption(opt))
}
client, err := NewClient(ctx, testutil.ProjID(), opts...)
client, err := NewClientWithDatabase(ctx, testutil.ProjID(), testParams["databaseID"].(string), opts...)
if err != nil {
t.Fatalf("NewClient: %v", err)
t.Fatalf("NewClientWithDatabase: %v", err)
}
return client
}

func TestIntegration_NewClient(t *testing.T) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
ctx := context.Background()
client, err := NewClient(ctx, testutil.ProjID())
if err != nil {
t.Fatalf("NewClient: %v", err)
}
if client.databaseID != DefaultDatabaseID {
t.Fatalf("NewClient: got %s, want %s", client.databaseID, DefaultDatabaseID)
}
}

func TestIntegration_Basics(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*20)
client := newTestClient(ctx, t)
Expand Down Expand Up @@ -1487,13 +1523,13 @@ func TestIntegration_DetectProjectID(t *testing.T) {
}

// Use creds with project ID.
if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(creds)); err != nil {
t.Errorf("NewClient: %v", err)
if _, err := NewClientWithDatabase(ctx, DetectProjectID, testParams["databaseID"].(string), option.WithCredentials(creds)); err != nil {
t.Errorf("NewClientWithDatabase: %v", err)
}

ts := testutil.ErroringTokenSource{}
// Try to use creds without project ID.
_, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(ts))
_, err := NewClientWithDatabase(ctx, DetectProjectID, testParams["databaseID"].(string), option.WithTokenSource(ts))
if err == nil || err.Error() != "datastore: see the docs on DetectProjectID" {
t.Errorf("expected an error while using TokenSource that does not have a project ID")
}
Expand Down
5 changes: 3 additions & 2 deletions datastore/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ func (c *Client) AllocateIDs(ctx context.Context, keys []*Key) ([]*Key, error) {
}

req := &pb.AllocateIdsRequest{
ProjectId: c.dataset,
Keys: multiKeyToProto(keys),
ProjectId: c.dataset,
DatabaseId: c.databaseID,
Keys: multiKeyToProto(keys),
}
resp, err := c.client.AllocateIds(ctx, req)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions datastore/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,8 @@ func (c *Client) Run(ctx context.Context, q *Query) *Iterator {
pageCursor: q.start,
entityCursor: q.start,
req: &pb.RunQueryRequest{
ProjectId: c.dataset,
ProjectId: c.dataset,
DatabaseId: c.databaseID,
},
}

Expand Down Expand Up @@ -766,7 +767,8 @@ func (c *Client) RunAggregationQuery(ctx context.Context, aq *AggregationQuery)
}

req := &pb.RunAggregationQueryRequest{
ProjectId: c.dataset,
ProjectId: c.dataset,
DatabaseId: c.databaseID,
QueryType: &pb.RunAggregationQueryRequest_AggregationQuery{
AggregationQuery: &pb.AggregationQuery{
QueryType: &pb.AggregationQuery_NestedQuery{
Expand Down
Loading

0 comments on commit e4d07a0

Please sign in to comment.