From a42604a3a6ea90c38a2ff90d036a79fd070174fd Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Mon, 8 Jan 2024 17:09:16 +0530 Subject: [PATCH] feat(spanner): add directed reads feature (#7668) * feat(spanner): add auto generated proto changes for directed reads * feat(spanner): add code changes for directed read feature * feat(spanner): add integration tests for directed read options * feat(spanner): pass DirectedOptions set in client level for Query when request level options are not set * feat(spanner): add unit tests for directed reads * feat(spanner): reuse validate function for directed read options * feat(spanner): code refactoring and comments * feat(spanner): throw error when directed read options set in PDML * feat(spanner): add unit tests to validate errors during RW transaction and PDML * feat(spanner): return error when Directed Read Options set for Partitioned BatchReadOnlyTransaction * feat(spanner): modify error message * feat(spanner): refactor unit test name * fix(spanner): test case * fix(spanner): test case * feat(spanner): remove manual autogenerated code changes * feat(spanner): remove client side validations for directed read options * feat(spanner): skip test for emulator * feat(spanner): go mod tidy * feat(spanner): consider client level dro for BatchReadOnlyTransaction as transaction type is read-only * feat(spanner): code refactor * feat(spanner): remove unit test for partitioned update * feat(spanner): rename test --- spanner/batch.go | 76 +++++++------ spanner/client.go | 15 +++ spanner/client_test.go | 212 +++++++++++++++++++++++++++++------- spanner/integration_test.go | 134 +++++++++++++++++++++++ spanner/transaction.go | 103 +++++++++++------- 5 files changed, 425 insertions(+), 115 deletions(-) diff --git a/spanner/batch.go b/spanner/batch.go index c0ba5ebe77dd..8d3c1cb9fe70 100644 --- a/spanner/batch.go +++ b/spanner/batch.go @@ -151,14 +151,15 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex } // Prepare ReadRequest. req := &sppb.ReadRequest{ - Session: sid, - Transaction: ts, - Table: table, - Index: index, - Columns: columns, - KeySet: kset, - RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""), - DataBoostEnabled: readOptions.DataBoostEnabled, + Session: sid, + Transaction: ts, + Table: table, + Index: index, + Columns: columns, + KeySet: kset, + RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""), + DataBoostEnabled: readOptions.DataBoostEnabled, + DirectedReadOptions: readOptions.DirectedReadOptions, } // Generate partitions. for _, p := range resp.GetPartitions() { @@ -215,14 +216,15 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement // prepare ExecuteSqlRequest r := &sppb.ExecuteSqlRequest{ - Session: sid, - Transaction: ts, - Sql: statement.SQL, - Params: params, - ParamTypes: paramTypes, - QueryOptions: qOpts.Options, - RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""), - DataBoostEnabled: qOpts.DataBoostEnabled, + Session: sid, + Transaction: ts, + Sql: statement.SQL, + Params: params, + ParamTypes: paramTypes, + QueryOptions: qOpts.Options, + RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""), + DataBoostEnabled: qOpts.DataBoostEnabled, + DirectedReadOptions: qOpts.DirectedReadOptions, } // generate Partitions @@ -313,16 +315,17 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R if p.rreq != nil { rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { client, err := client.StreamingRead(ctx, &sppb.ReadRequest{ - Session: p.rreq.Session, - Transaction: p.rreq.Transaction, - Table: p.rreq.Table, - Index: p.rreq.Index, - Columns: p.rreq.Columns, - KeySet: p.rreq.KeySet, - PartitionToken: p.pt, - RequestOptions: p.rreq.RequestOptions, - ResumeToken: resumeToken, - DataBoostEnabled: p.rreq.DataBoostEnabled, + Session: p.rreq.Session, + Transaction: p.rreq.Transaction, + Table: p.rreq.Table, + Index: p.rreq.Index, + Columns: p.rreq.Columns, + KeySet: p.rreq.KeySet, + PartitionToken: p.pt, + RequestOptions: p.rreq.RequestOptions, + ResumeToken: resumeToken, + DataBoostEnabled: p.rreq.DataBoostEnabled, + DirectedReadOptions: p.rreq.DirectedReadOptions, }) if err != nil { return client, err @@ -338,16 +341,17 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R } else { rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{ - Session: p.qreq.Session, - Transaction: p.qreq.Transaction, - Sql: p.qreq.Sql, - Params: p.qreq.Params, - ParamTypes: p.qreq.ParamTypes, - QueryOptions: p.qreq.QueryOptions, - PartitionToken: p.pt, - RequestOptions: p.qreq.RequestOptions, - ResumeToken: resumeToken, - DataBoostEnabled: p.qreq.DataBoostEnabled, + Session: p.qreq.Session, + Transaction: p.qreq.Transaction, + Sql: p.qreq.Sql, + Params: p.qreq.Params, + ParamTypes: p.qreq.ParamTypes, + QueryOptions: p.qreq.QueryOptions, + PartitionToken: p.pt, + RequestOptions: p.qreq.RequestOptions, + ResumeToken: resumeToken, + DataBoostEnabled: p.qreq.DataBoostEnabled, + DirectedReadOptions: p.qreq.DirectedReadOptions, }) if err != nil { return client, err diff --git a/spanner/client.go b/spanner/client.go index 52cc320a74ff..c19f254a6243 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -103,6 +103,7 @@ type Client struct { bwo BatchWriteOptions ct *commonTags disableRouteToLeader bool + dro *sppb.DirectedReadOptions } // DatabaseName returns the full name of a database, e.g., @@ -186,6 +187,11 @@ type ClientConfig struct { // BatchTimeout specifies the timeout for a batch of sessions managed sessionClient. BatchTimeout time.Duration + + // ClientConfig options used to set the DirectedReadOptions for all ReadRequests + // and ExecuteSqlRequests for the Client which indicate which replicas or regions + // should be used for non-transactional reads or queries. + DirectedReadOptions *sppb.DirectedReadOptions } func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD, disableRouteToLeader bool) context.Context { @@ -291,6 +297,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf bwo: config.BatchWriteOptions, ct: getCommonTags(sc), disableRouteToLeader: config.DisableRouteToLeader, + dro: config.DirectedReadOptions, } return c, nil } @@ -374,6 +381,8 @@ func (c *Client) Single() *ReadOnlyTransaction { t.sh = sh return nil } + t.txReadOnly.qo.DirectedReadOptions = c.dro + t.txReadOnly.ro.DirectedReadOptions = c.dro t.ct = c.ct return t } @@ -397,6 +406,8 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { t.txReadOnly.qo = c.qo t.txReadOnly.ro = c.ro t.txReadOnly.disableRouteToLeader = true + t.txReadOnly.qo.DirectedReadOptions = c.dro + t.txReadOnly.ro.DirectedReadOptions = c.dro t.ct = c.ct return t } @@ -465,6 +476,8 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound t.txReadOnly.qo = c.qo t.txReadOnly.ro = c.ro t.txReadOnly.disableRouteToLeader = true + t.txReadOnly.qo.DirectedReadOptions = c.dro + t.txReadOnly.ro.DirectedReadOptions = c.dro t.ct = c.ct return t, nil } @@ -496,6 +509,8 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) t.txReadOnly.qo = c.qo t.txReadOnly.ro = c.ro t.txReadOnly.disableRouteToLeader = true + t.txReadOnly.qo.DirectedReadOptions = c.dro + t.txReadOnly.ro.DirectedReadOptions = c.dro t.ct = c.ct return t } diff --git a/spanner/client_test.go b/spanner/client_test.go index c6e180a7c68e..c30fa2051dbd 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -555,13 +555,17 @@ func checkReqsForQueryOptions(t *testing.T, server InMemSpannerServer, qo QueryO t.Fatalf("Length mismatch, got %v, want %v", got, want) } - reqQueryOptions := sqlReqs[0].QueryOptions + sqlReq := sqlReqs[0] + reqQueryOptions := sqlReq.QueryOptions if got, want := reqQueryOptions.OptimizerVersion, qo.Options.OptimizerVersion; got != want { t.Fatalf("Optimizer version mismatch, got %v, want %v", got, want) } if got, want := reqQueryOptions.OptimizerStatisticsPackage, qo.Options.OptimizerStatisticsPackage; got != want { t.Fatalf("Optimizer statistics package mismatch, got %v, want %v", got, want) } + if got, want := sqlReq.DirectedReadOptions, qo.DirectedReadOptions; got.String() != want.String() { + t.Fatalf("Directed Read Options mismatch, got %v, want %v", got, want) + } } func testReadOptions(t *testing.T, iter *RowIterator, server InMemSpannerServer, ro ReadOptions) { @@ -604,6 +608,9 @@ func checkReqsForReadOptions(t *testing.T, server InMemSpannerServer, ro ReadOpt if got, want := reqRequestOptions.RequestTag, ro.RequestTag; got != want { t.Fatalf("Request tag mismatch, got %v, want %v", got, want) } + if got, want := sqlReq.DirectedReadOptions, ro.DirectedReadOptions; got.String() != want.String() { + t.Fatalf("Directed Read Options mismatch, got %v, want %v", got, want) + } } func checkReqsForTransactionOptions(t *testing.T, server InMemSpannerServer, txo TransactionOptions) { @@ -1036,6 +1043,129 @@ func TestClient_ReadOnlyTransaction_ReadOptions(t *testing.T) { } } +func TestClient_DirectedReadOptions(t *testing.T) { + directedReadOptions := &sppb.DirectedReadOptions{ + Replicas: &sppb.DirectedReadOptions_IncludeReplicas_{ + IncludeReplicas: &sppb.DirectedReadOptions_IncludeReplicas{ + ReplicaSelections: []*sppb.DirectedReadOptions_ReplicaSelection{ + { + Location: "us-west1", + Type: sppb.DirectedReadOptions_ReplicaSelection_READ_ONLY, + }, + }, + AutoFailoverDisabled: true, + }, + }, + } + + readOptionsTestCases := []ReadOptionsTestCase{ + { + name: "Client level", + clientDRO: directedReadOptions, + want: &ReadOptions{DirectedReadOptions: directedReadOptions}, + }, + { + name: "Read level", + read: &ReadOptions{DirectedReadOptions: directedReadOptions}, + want: &ReadOptions{DirectedReadOptions: directedReadOptions}, + }, + { + name: "Read level has precedence than client level", + clientDRO: &sppb.DirectedReadOptions{}, + read: &ReadOptions{DirectedReadOptions: directedReadOptions}, + want: &ReadOptions{DirectedReadOptions: directedReadOptions}, + }, + } + + queryOptionsTestCases := []QueryOptionsTestCase{ + { + name: "Client level", + clientDRO: directedReadOptions, + want: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{}, DirectedReadOptions: directedReadOptions}, + }, + { + name: "Query level", + query: QueryOptions{DirectedReadOptions: directedReadOptions}, + want: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{}, DirectedReadOptions: directedReadOptions}, + }, + { + name: "Query level has precedence than client level", + clientDRO: &sppb.DirectedReadOptions{}, + query: QueryOptions{DirectedReadOptions: directedReadOptions}, + want: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{}, DirectedReadOptions: directedReadOptions}, + }, + } + + for _, tt := range readOptionsTestCases { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{DirectedReadOptions: tt.clientDRO}) + defer teardown() + + tx := client.ReadOnlyTransaction() + defer tx.Close() + + var iter *RowIterator + if tt.read == nil { + iter = tx.Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}) + } else { + iter = tx.ReadWithOptions(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, tt.read) + } + testReadOptions(t, iter, server.TestSpanner, *tt.want) + }) + } + + for _, tt := range queryOptionsTestCases { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{DirectedReadOptions: tt.clientDRO}) + defer teardown() + + var iter *RowIterator + if tt.query.DirectedReadOptions == nil { + iter = client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) + } else { + iter = client.Single().QueryWithOptions(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), tt.query) + } + testQueryOptions(t, iter, server.TestSpanner, tt.want) + }) + } + + ctx := context.Background() + directedReadOptionsForRW := &sppb.DirectedReadOptions{ + Replicas: &sppb.DirectedReadOptions_ExcludeReplicas_{ + ExcludeReplicas: &sppb.DirectedReadOptions_ExcludeReplicas{ + ReplicaSelections: []*sppb.DirectedReadOptions_ReplicaSelection{ + { + Location: "us-west1", + Type: sppb.DirectedReadOptions_ReplicaSelection_READ_ONLY, + }, + }, + }, + }, + } + server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{DirectedReadOptions: directedReadOptionsForRW}) + defer teardown() + + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *ReadWriteTransaction) error { + iter := txn.ReadWithOptions(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, &ReadOptions{DirectedReadOptions: directedReadOptions}) + testReadOptions(t, iter, server.TestSpanner, ReadOptions{DirectedReadOptions: directedReadOptions}) + return nil + }) + if err != nil { + t.Fatal(err) + } + + _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *ReadWriteTransaction) error { + iter := txn.QueryWithOptions(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), QueryOptions{DirectedReadOptions: directedReadOptions}) + testQueryOptions(t, iter, server.TestSpanner, QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{}, DirectedReadOptions: directedReadOptions}) + return nil + }) + if err != nil { + t.Fatal(err) + } +} + func TestClient_ReadOnlyTransaction_WhenMultipleOperations_SessionLastUseTimeShouldBeUpdated(t *testing.T) { t.Parallel() @@ -3896,66 +4026,68 @@ func TestBatchReadOnlyTransactionFromID_ReadOptions(t *testing.T) { } type QueryOptionsTestCase struct { - name string - client QueryOptions - env QueryOptions - query QueryOptions - want QueryOptions + name string + client QueryOptions + clientDRO *sppb.DirectedReadOptions + env QueryOptions + query QueryOptions + want QueryOptions } func queryOptionsTestCases() []QueryOptionsTestCase { statsPkg := "latest" return []QueryOptionsTestCase{ { - "Client level", - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, - QueryOptions{Options: nil}, - QueryOptions{Options: nil}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, + name: "Client level", + client: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, + env: QueryOptions{Options: nil}, + query: QueryOptions{Options: nil}, + want: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, }, { - "Environment level", - QueryOptions{Options: nil}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, - QueryOptions{Options: nil}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, + name: "Environment level", + client: QueryOptions{Options: nil}, + env: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, + query: QueryOptions{Options: nil}, + want: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, }, { - "Query level", - QueryOptions{Options: nil}, - QueryOptions{Options: nil}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, + name: "Query level", + client: QueryOptions{Options: nil}, + env: QueryOptions{Options: nil}, + query: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, + want: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, }, { - "Environment level has precedence", - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}}, - QueryOptions{Options: nil}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}}, + name: "Environment level has precedence", + client: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, + env: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}}, + query: QueryOptions{Options: nil}, + want: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}}, }, { - "Query level has precedence than client level", - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, - QueryOptions{Options: nil}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, + name: "Query level has precedence than client level", + client: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, + env: QueryOptions{Options: nil}, + query: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, + want: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, }, { - "Query level has highest precedence", - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, - QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, + name: "Query level has highest precedence", + client: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}}, + env: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}}, + query: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, + want: QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}}, }, } } type ReadOptionsTestCase struct { - name string - client *ReadOptions - read *ReadOptions - want *ReadOptions + name string + client *ReadOptions + clientDRO *sppb.DirectedReadOptions + read *ReadOptions + want *ReadOptions } func readOptionsTestCases() []ReadOptionsTestCase { diff --git a/spanner/integration_test.go b/spanner/integration_test.go index 9356c0f9d046..c7d781fa09e5 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -5037,6 +5037,140 @@ func TestIntegration_Bit_Reversed_Sequence(t *testing.T) { } } +func TestIntegration_WithDirectedReadOptions_ReadOnlyTransaction(t *testing.T) { + t.Parallel() + // DirectedReadOptions for PG is supported, however we test only for Google SQL. + skipUnsupportedPGTest(t) + skipEmulatorTest(t) + + ctxTimeout := 5 * time.Minute + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + // Set up testing environment. + client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements]) + defer cleanup() + + directedReadOptions := &sppb.DirectedReadOptions{ + Replicas: &sppb.DirectedReadOptions_IncludeReplicas_{ + IncludeReplicas: &sppb.DirectedReadOptions_IncludeReplicas{ + ReplicaSelections: []*sppb.DirectedReadOptions_ReplicaSelection{ + { + Location: "us-west1", + Type: sppb.DirectedReadOptions_ReplicaSelection_READ_ONLY, + }, + }, + AutoFailoverDisabled: true, + }, + }, + } + + writes := []struct { + row []interface{} + ts time.Time + }{ + {row: []interface{}{1, "Marc", "Foo"}}, + {row: []interface{}{2, "Tars", "Bar"}}, + {row: []interface{}{3, "Alpha", "Beta"}}, + {row: []interface{}{4, "Last", "End"}}, + } + // Try to write four rows through the Apply API. + for i, w := range writes { + var err error + m := InsertOrUpdate("Singers", + []string{"SingerId", "FirstName", "LastName"}, + w.row) + if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { + t.Fatal(err) + } + } + + want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}} + + // Test DirectedReadOptions for ReadOnlyTransaction.ReadWithOptions + got, err := readAll(client.ReadOnlyTransaction().ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}, + &ReadOptions{DirectedReadOptions: directedReadOptions})) + if err != nil { + t.Errorf("DirectedReadOptions using ReadOptions returns error %v, want nil", err) + } + if !testEqual(got, want) { + t.Errorf("got unexpected result in DirectedReadOptions test: %v, want %v", got, want) + } + + // Test DirectedReadOptions for ReadOnlyTransaction.QueryWithOptions + singersQuery := "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@p1, @p2, @p3) ORDER BY SingerId" + got, err = readAll(client.Single().QueryWithOptions(ctx, Statement{ + singersQuery, + map[string]interface{}{"p1": int64(1), "p2": int64(3), "p3": int64(4)}, + }, QueryOptions{DirectedReadOptions: directedReadOptions})) + + if err != nil { + t.Errorf("DirectedReadOptions using QueryOptions returns error %v, want nil", err) + } + + if !testEqual(got, want) { + t.Errorf("got unexpected result in DirectedReadOptions test: %v, want %v", got, want) + } +} + +func TestIntegration_WithDirectedReadOptions_ReadWriteTransaction_ShouldThrowError(t *testing.T) { + t.Parallel() + // DirectedReadOptions for PG is supported, however we test only for Google SQL. + skipUnsupportedPGTest(t) + skipEmulatorTest(t) + + ctxTimeout := 5 * time.Minute + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + // Set up testing environment. + client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements]) + defer cleanup() + + directedReadOptions := &sppb.DirectedReadOptions{ + Replicas: &sppb.DirectedReadOptions_IncludeReplicas_{ + IncludeReplicas: &sppb.DirectedReadOptions_IncludeReplicas{ + ReplicaSelections: []*sppb.DirectedReadOptions_ReplicaSelection{ + { + Location: "us-west1", + Type: sppb.DirectedReadOptions_ReplicaSelection_READ_ONLY, + }, + }, + AutoFailoverDisabled: true, + }, + }, + } + + writes := []struct { + row []interface{} + ts time.Time + }{ + {row: []interface{}{1, "Marc", "Foo"}}, + {row: []interface{}{2, "Tars", "Bar"}}, + {row: []interface{}{3, "Alpha", "Beta"}}, + {row: []interface{}{4, "Last", "End"}}, + } + // Try to write four rows through the Apply API. + for i, w := range writes { + var err error + m := InsertOrUpdate("Singers", + []string{"SingerId", "FirstName", "LastName"}, + w.row) + if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { + t.Fatal(err) + } + } + + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { + _, err = readAll(tx.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}, &ReadOptions{DirectedReadOptions: directedReadOptions})) + return err + }) + if err == nil { + t.Fatal("expected err, got nil") + } + if msg, ok := matchError(err, codes.InvalidArgument, "Directed reads can only be performed in a read-only transaction"); !ok { + t.Fatal(msg) + } +} + // Prepare initializes Cloud Spanner testing DB and clients. func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) { return prepareDBAndClient(ctx, t, spc, statements, testDialect) diff --git a/spanner/transaction.go b/spanner/transaction.go index 875162e4e45c..b7df284f60a4 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -170,17 +170,22 @@ type ReadOptions struct { // If this is for a partitioned read and DataBoostEnabled field is set to true, the request will be executed // via Spanner independent compute resources. Setting this option for regular read operations has no effect. DataBoostEnabled bool + + // ReadOptions option used to set the DirectedReadOptions for all ReadRequests which indicate + // which replicas or regions should be used for running read operations. + DirectedReadOptions *sppb.DirectedReadOptions } // merge combines two ReadOptions that the input parameter will have higher // order of precedence. func (ro ReadOptions) merge(opts ReadOptions) ReadOptions { merged := ReadOptions{ - Index: ro.Index, - Limit: ro.Limit, - Priority: ro.Priority, - RequestTag: ro.RequestTag, - DataBoostEnabled: ro.DataBoostEnabled, + Index: ro.Index, + Limit: ro.Limit, + Priority: ro.Priority, + RequestTag: ro.RequestTag, + DataBoostEnabled: ro.DataBoostEnabled, + DirectedReadOptions: ro.DirectedReadOptions, } if opts.Index != "" { merged.Index = opts.Index @@ -197,6 +202,9 @@ func (ro ReadOptions) merge(opts ReadOptions) ReadOptions { if opts.DataBoostEnabled { merged.DataBoostEnabled = opts.DataBoostEnabled } + if opts.DirectedReadOptions != nil { + merged.DirectedReadOptions = opts.DirectedReadOptions + } return merged } @@ -228,6 +236,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key prio := t.ro.Priority requestTag := t.ro.RequestTag dataBoostEnabled := t.ro.DataBoostEnabled + directedReadOptions := t.ro.DirectedReadOptions if opts != nil { index = opts.Index if opts.Limit > 0 { @@ -238,6 +247,9 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key if opts.DataBoostEnabled { dataBoostEnabled = opts.DataBoostEnabled } + if opts.DirectedReadOptions != nil { + directedReadOptions = opts.DirectedReadOptions + } } var setTransactionID func(transactionID) if _, ok := ts.Selector.(*sppb.TransactionSelector_Begin); ok { @@ -254,16 +266,17 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key } client, err := client.StreamingRead(ctx, &sppb.ReadRequest{ - Session: t.sh.getID(), - Transaction: t.getTransactionSelector(), - Table: table, - Index: index, - Columns: columns, - KeySet: kset, - ResumeToken: resumeToken, - Limit: int64(limit), - RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag), - DataBoostEnabled: dataBoostEnabled, + Session: t.sh.getID(), + Transaction: t.getTransactionSelector(), + Table: table, + Index: index, + Columns: columns, + KeySet: kset, + ResumeToken: resumeToken, + Limit: int64(limit), + RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag), + DataBoostEnabled: dataBoostEnabled, + DirectedReadOptions: directedReadOptions, }) if err != nil { if _, ok := t.getTransactionSelector().GetSelector().(*sppb.TransactionSelector_Begin); ok { @@ -378,17 +391,22 @@ type QueryOptions struct { // If this is for a partitioned query and DataBoostEnabled field is set to true, the request will be executed // via Spanner independent compute resources. Setting this option for regular query operations has no effect. DataBoostEnabled bool + + // QueryOptions option used to set the DirectedReadOptions for all ExecuteSqlRequests which indicate + // which replicas or regions should be used for executing queries. + DirectedReadOptions *sppb.DirectedReadOptions } // merge combines two QueryOptions that the input parameter will have higher // order of precedence. func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { merged := QueryOptions{ - Mode: qo.Mode, - Options: &sppb.ExecuteSqlRequest_QueryOptions{}, - RequestTag: qo.RequestTag, - Priority: qo.Priority, - DataBoostEnabled: qo.DataBoostEnabled, + Mode: qo.Mode, + Options: &sppb.ExecuteSqlRequest_QueryOptions{}, + RequestTag: qo.RequestTag, + Priority: qo.Priority, + DataBoostEnabled: qo.DataBoostEnabled, + DirectedReadOptions: qo.DirectedReadOptions, } if opts.Mode != nil { merged.Mode = opts.Mode @@ -402,6 +420,9 @@ func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { if opts.DataBoostEnabled { merged.DataBoostEnabled = opts.DataBoostEnabled } + if opts.DirectedReadOptions != nil { + merged.DirectedReadOptions = opts.DirectedReadOptions + } proto.Merge(merged.Options, qo.Options) proto.Merge(merged.Options, opts.Options) return merged @@ -430,9 +451,10 @@ func createRequestOptions(prio sppb.RequestOptions_Priority, requestTag, transac func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator { mode := sppb.ExecuteSqlRequest_NORMAL return t.query(ctx, statement, QueryOptions{ - Mode: &mode, - Options: t.qo.Options, - Priority: t.qo.Priority, + Mode: &mode, + Options: t.qo.Options, + Priority: t.qo.Priority, + DirectedReadOptions: t.qo.DirectedReadOptions, }) } @@ -449,9 +471,10 @@ func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator { mode := sppb.ExecuteSqlRequest_PROFILE return t.query(ctx, statement, QueryOptions{ - Mode: &mode, - Options: t.qo.Options, - Priority: t.qo.Priority, + Mode: &mode, + Options: t.qo.Options, + Priority: t.qo.Priority, + DirectedReadOptions: t.qo.DirectedReadOptions, }) } @@ -459,9 +482,10 @@ func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *R func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) { mode := sppb.ExecuteSqlRequest_PLAN iter := t.query(ctx, statement, QueryOptions{ - Mode: &mode, - Options: t.qo.Options, - Priority: t.qo.Priority, + Mode: &mode, + Options: t.qo.Options, + Priority: t.qo.Priority, + DirectedReadOptions: t.qo.DirectedReadOptions, }) defer iter.Stop() for { @@ -544,16 +568,17 @@ func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, opti mode = *options.Mode } req := &sppb.ExecuteSqlRequest{ - Session: sid, - Transaction: ts, - Sql: stmt.SQL, - QueryMode: mode, - Seqno: atomic.AddInt64(&t.sequenceNumber, 1), - Params: params, - ParamTypes: paramTypes, - QueryOptions: options.Options, - RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag), - DataBoostEnabled: options.DataBoostEnabled, + Session: sid, + Transaction: ts, + Sql: stmt.SQL, + QueryMode: mode, + Seqno: atomic.AddInt64(&t.sequenceNumber, 1), + Params: params, + ParamTypes: paramTypes, + QueryOptions: options.Options, + RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag), + DataBoostEnabled: options.DataBoostEnabled, + DirectedReadOptions: options.DirectedReadOptions, } return req, sh, nil }