Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datastore): Multi DB support #8276

Merged
merged 9 commits into from
Jul 26, 2023
10 changes: 8 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ variables:
bamboo-shift-455) for the general project.
- `GCLOUD_TESTS_GOLANG_KEY`: The path to the JSON key file of the general
project's service account.
- `GCLOUD_TESTS_GOLANG_DATASTORE_DATABASES`: Comma separated list of developer's Datastore databases. If not provided, default database i.e. empty string is used.
- `GCLOUD_TESTS_GOLANG_FIRESTORE_PROJECT_ID`: Developers Console project's ID
(e.g. doorway-cliff-677) for the Firestore project.
- `GCLOUD_TESTS_GOLANG_FIRESTORE_KEY`: The path to the JSON key file of the
Expand Down Expand Up @@ -153,8 +154,9 @@ $ gcloud config set project $GCLOUD_TESTS_GOLANG_PROJECT_ID
# Authenticates the gcloud tool with your account.
$ gcloud auth login

# Create the indexes used in the datastore integration tests.
$ gcloud datastore indexes create datastore/testdata/index.yaml
# Create the indexes for all the databases you want to use in the datastore integration tests.
# Use empty string as databaseID or skip database flag for default database.
$ gcloud alpha datastore indexes create --database=your-databaseID-1 --project=cndb-sdk-golang-general testdata/index.yaml

# Creates a Google Cloud storage bucket with the same name as your test project,
# and with the Cloud Logging service account as owner, for the sink
Expand Down Expand Up @@ -219,6 +221,10 @@ export GCLOUD_TESTS_GOLANG_PROJECT_ID=your-project
# The path to the JSON key file of the general project's service account.
export GCLOUD_TESTS_GOLANG_KEY=~/directory/your-project-abcd1234.json

# Comma separated list of developer's Datastore databases. If not provided,
# default database i.e. empty string is used.
export GCLOUD_TESTS_GOLANG_DATASTORE_DATABASES=your-database-1,your-database-2

# Developers Console project's ID (e.g. doorway-cliff-677) for the Firestore project.
export GCLOUD_TESTS_GOLANG_FIRESTORE_PROJECT_ID=your-firestore-project

Expand Down
8 changes: 6 additions & 2 deletions datastore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ type datastoreClient struct {
md metadata.MD
}

func newDatastoreClient(conn grpc.ClientConnInterface, projectID string) pb.DatastoreClient {
func newDatastoreClient(conn grpc.ClientConnInterface, projectID, databaseID string) pb.DatastoreClient {
resourcePrefixValue := "projects/" + projectID
if databaseID != "" {
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
resourcePrefixValue += "/databases/" + databaseID
}
return &datastoreClient{
c: pb.NewDatastoreClient(conn),
md: metadata.Pairs(
resourcePrefixHeader, "projects/"+projectID,
resourcePrefixHeader, resourcePrefixValue,
"x-goog-api-client", fmt.Sprintf("gl-go/%s gccl/%s grpc/", version.Go(), internal.Version)),
}
}
Expand Down
55 changes: 42 additions & 13 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,20 @@ const DetectProjectID = "*detect-project-id*"
// the resource being operated on.
const resourcePrefixHeader = "google-cloud-resource-prefix"

// DefaultDatabaseID is ID of the default database denoted by an empty string
const DefaultDatabaseID = ""

var (
gtransportDialPoolFn = gtransport.DialPool
detectProjectIDFn = detectProjectID
)

// Client is a client for reading and writing data in a datastore dataset.
type Client struct {
connPool gtransport.ConnPool
client pb.DatastoreClient
dataset string // Called dataset by the datastore API, synonym for project ID.
databaseID string // Default value is empty string
readSettings *readSettings
}

Expand All @@ -69,6 +78,21 @@ type Client struct {
// NewClient to detect the project ID from the credentials.
// Call (*Client).Close() when done with the client.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
client, err := NewClientWithDatabase(ctx, projectID, DefaultDatabaseID, opts...)
if err != nil {
return nil, err
}
return client, nil
}

// NewClientWithDatabase creates a new Client for given dataset and database.
// If the project ID is empty, it is derived from the DATASTORE_PROJECT_ID environment variable.
// If the DATASTORE_EMULATOR_HOST environment variable is set, client will use
// its value to connect to a locally-running datastore emulator.
// DetectProjectID can be passed as the projectID argument to instruct
// NewClientWithDatabase to detect the project ID from the credentials.
// Call (*Client).Close() when done with the client.
func NewClientWithDatabase(ctx context.Context, projectID, databaseID string, opts ...option.ClientOption) (*Client, error) {
var o []option.ClientOption
// Environment variables for gcd emulator:
// https://cloud.google.com/datastore/docs/tools/datastore-emulator
Expand All @@ -80,7 +104,7 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
option.WithGRPCDialOption(grpc.WithInsecure()),
}
if projectID == DetectProjectID {
projectID, _ = detectProjectID(ctx, opts...)
projectID, _ = detectProjectIDFn(ctx, opts...)
if projectID == "" {
projectID = "dummy-emulator-datastore-project"
}
Expand All @@ -106,7 +130,7 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
o = append(o, opts...)

if projectID == DetectProjectID {
detected, err := detectProjectID(ctx, opts...)
detected, err := detectProjectIDFn(ctx, opts...)
if err != nil {
return nil, err
}
Expand All @@ -116,15 +140,16 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
if projectID == "" {
return nil, errors.New("datastore: missing project/dataset id")
}
connPool, err := gtransport.DialPool(ctx, o...)
connPool, err := gtransportDialPoolFn(ctx, o...)
if err != nil {
return nil, fmt.Errorf("dialing: %w", err)
}
return &Client{
connPool: connPool,
client: newDatastoreClient(connPool, projectID),
client: newDatastoreClient(connPool, projectID, databaseID),
dataset: projectID,
readSettings: &readSettings{},
databaseID: databaseID,
}, nil
}

Expand Down Expand Up @@ -468,6 +493,7 @@ func (c *Client) get(ctx context.Context, keys []*Key, dst interface{}, opts *pb
}
req := &pb.LookupRequest{
ProjectId: c.dataset,
DatabaseId: c.databaseID,
Keys: pbKeys,
ReadOptions: opts,
}
Expand Down Expand Up @@ -565,9 +591,10 @@ func (c *Client) PutMulti(ctx context.Context, keys []*Key, src interface{}) (re

// Make the request.
req := &pb.CommitRequest{
ProjectId: c.dataset,
Mutations: mutations,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
ProjectId: c.dataset,
DatabaseId: c.databaseID,
Mutations: mutations,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
}
resp, err := c.client.Commit(ctx, req)
if err != nil {
Expand Down Expand Up @@ -688,9 +715,10 @@ func (c *Client) DeleteMulti(ctx context.Context, keys []*Key) (err error) {
}

req := &pb.CommitRequest{
ProjectId: c.dataset,
Mutations: mutations,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
ProjectId: c.dataset,
DatabaseId: c.databaseID,
Mutations: mutations,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
}
_, err = c.client.Commit(ctx, req)
return err
Expand Down Expand Up @@ -740,9 +768,10 @@ func (c *Client) Mutate(ctx context.Context, muts ...*Mutation) (ret []*Key, err
return nil, err
}
req := &pb.CommitRequest{
ProjectId: c.dataset,
Mutations: pmuts,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
ProjectId: c.dataset,
DatabaseId: c.databaseID,
Mutations: pmuts,
Mode: pb.CommitRequest_NON_TRANSACTIONAL,
}
resp, err := c.client.Commit(ctx, req)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,65 @@ import (

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
"google.golang.org/api/option"
"google.golang.org/api/transport/grpc"
pb "google.golang.org/genproto/googleapis/datastore/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestNewClientWithDatabase(t *testing.T) {
origDetectProjectIDFn := detectProjectIDFn
origGtransportDialPoolFn := gtransportDialPoolFn
defer func() {
detectProjectIDFn = origDetectProjectIDFn
gtransportDialPoolFn = origGtransportDialPoolFn
}()

tests := []struct {
desc string
databaseID string
wantErr bool
detectProjectIDFn func(ctx context.Context, opts ...option.ClientOption) (string, error)
gtransportDialPoolFn func(ctx context.Context, opts ...option.ClientOption) (grpc.ConnPool, error)
}{
{
desc: "Error from detect project ID should not fail NewClientWithDatabase",
databaseID: "db1",
wantErr: false,
detectProjectIDFn: func(ctx context.Context, opts ...option.ClientOption) (string, error) {
return "", errors.New("mock error from detect project ID")
},
gtransportDialPoolFn: origGtransportDialPoolFn,
},
{
desc: "Error from DialPool",
databaseID: "db1",
wantErr: true,
detectProjectIDFn: origDetectProjectIDFn,
gtransportDialPoolFn: func(ctx context.Context, opts ...option.ClientOption) (grpc.ConnPool, error) {
return nil, errors.New("mock error from DialPool")
},
},
}

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
detectProjectIDFn = tc.detectProjectIDFn
gtransportDialPoolFn = tc.gtransportDialPoolFn

client, gotErr := NewClientWithDatabase(context.Background(), "my-project", tc.databaseID)
if gotErr != nil && !tc.wantErr {
t.Errorf("got error %v, but none was expected", gotErr)
} else if gotErr == nil && tc.wantErr {
t.Errorf("wanted error, but none returned")
} else if gotErr == nil && client.databaseID != tc.databaseID {
t.Errorf("got %s, want %s", client.databaseID, tc.databaseID)
}
})
}
}

func TestQueryConstruction(t *testing.T) {
tests := []struct {
q, exp *Query
Expand Down
54 changes: 45 additions & 9 deletions datastore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ var timeNow = time.Now()
// when the tests are run in parallel.
var suffix string

const replayFilename = "datastore.replay"
const (
replayFilename = "datastore.replay"
envDatabases = "GCLOUD_TESTS_GOLANG_DATASTORE_DATABASES"
)

type replayInfo struct {
ProjectID string
Expand All @@ -62,6 +65,7 @@ var (
newTestClient = func(ctx context.Context, t *testing.T) *Client {
return newClient(ctx, t, nil)
}
testParams map[string]interface{}
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -103,7 +107,25 @@ func testMain(m *testing.M) int {
log.Printf("recording to %s", replayFilename)
}
suffix = fmt.Sprintf("-t%d", timeNow.UnixNano())
return m.Run()

// Run tests on multiple databases
databasesStr, ok := os.LookupEnv(envDatabases)
if !ok {
databasesStr = ""
}
databaseIDs := strings.Split(databasesStr, ",")
testParams = make(map[string]interface{})

for _, databaseID := range databaseIDs {
log.Printf("Setting up tests to run on databaseID: %q\n", databaseID)
testParams["databaseID"] = databaseID
status := m.Run()
if status != 0 {
return status
}
}

return 0
}

func initReplay() {
Expand Down Expand Up @@ -133,9 +155,9 @@ func initReplay() {
}

opts := append(grpcHeadersEnforcer.CallOptions(), option.WithGRPCConn(conn))
client, err := NewClient(ctx, ri.ProjectID, opts...)
client, err := NewClientWithDatabase(ctx, ri.ProjectID, testParams["databaseID"].(string), opts...)
if err != nil {
t.Fatalf("NewClient: %v", err)
t.Fatalf("NewClientWithDatabase: %v", err)
}
return client
}
Expand All @@ -161,13 +183,27 @@ func newClient(ctx context.Context, t *testing.T, dialOpts []grpc.DialOption) *C
for _, opt := range dialOpts {
opts = append(opts, option.WithGRPCDialOption(opt))
}
client, err := NewClient(ctx, testutil.ProjID(), opts...)
client, err := NewClientWithDatabase(ctx, testutil.ProjID(), testParams["databaseID"].(string), opts...)
if err != nil {
t.Fatalf("NewClient: %v", err)
t.Fatalf("NewClientWithDatabase: %v", err)
}
return client
}

func TestIntegration_NewClient(t *testing.T) {
if testing.Short() {
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
t.Skip("Integration tests skipped in short mode")
}
ctx := context.Background()
client, err := NewClient(ctx, testutil.ProjID())
if err != nil {
t.Fatalf("NewClient: %v", err)
}
if client.databaseID != DefaultDatabaseID {
t.Fatalf("NewClient: got %s, want %s", client.databaseID, DefaultDatabaseID)
}
}

func TestIntegration_Basics(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*20)
client := newTestClient(ctx, t)
Expand Down Expand Up @@ -1487,13 +1523,13 @@ func TestIntegration_DetectProjectID(t *testing.T) {
}

// Use creds with project ID.
if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(creds)); err != nil {
t.Errorf("NewClient: %v", err)
if _, err := NewClientWithDatabase(ctx, DetectProjectID, testParams["databaseID"].(string), option.WithCredentials(creds)); err != nil {
t.Errorf("NewClientWithDatabase: %v", err)
}

ts := testutil.ErroringTokenSource{}
// Try to use creds without project ID.
_, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(ts))
_, err := NewClientWithDatabase(ctx, DetectProjectID, testParams["databaseID"].(string), option.WithTokenSource(ts))
if err == nil || err.Error() != "datastore: see the docs on DetectProjectID" {
t.Errorf("expected an error while using TokenSource that does not have a project ID")
}
Expand Down
5 changes: 3 additions & 2 deletions datastore/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ func (c *Client) AllocateIDs(ctx context.Context, keys []*Key) ([]*Key, error) {
}

req := &pb.AllocateIdsRequest{
ProjectId: c.dataset,
Keys: multiKeyToProto(keys),
ProjectId: c.dataset,
DatabaseId: c.databaseID,
Keys: multiKeyToProto(keys),
}
resp, err := c.client.AllocateIds(ctx, req)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions datastore/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,8 @@ func (c *Client) Run(ctx context.Context, q *Query) *Iterator {
pageCursor: q.start,
entityCursor: q.start,
req: &pb.RunQueryRequest{
ProjectId: c.dataset,
ProjectId: c.dataset,
DatabaseId: c.databaseID,
},
}

Expand Down Expand Up @@ -766,7 +767,8 @@ func (c *Client) RunAggregationQuery(ctx context.Context, aq *AggregationQuery)
}

req := &pb.RunAggregationQueryRequest{
ProjectId: c.dataset,
ProjectId: c.dataset,
DatabaseId: c.databaseID,
QueryType: &pb.RunAggregationQueryRequest_AggregationQuery{
AggregationQuery: &pb.AggregationQuery{
QueryType: &pb.AggregationQuery_NestedQuery{
Expand Down
Loading