Skip to content

Commit

Permalink
feat(spanner): add directed reads feature (#7668)
Browse files Browse the repository at this point in the history
* feat(spanner): add auto generated proto changes for directed reads

* feat(spanner): add code changes for directed read feature

* feat(spanner): add integration tests for directed read options

* feat(spanner): pass DirectedOptions set in client level for Query when request level options are not set

* feat(spanner): add unit tests for directed reads

* feat(spanner): reuse validate function for directed read options

* feat(spanner): code refactoring and comments

* feat(spanner): throw error when directed read options set in PDML

* feat(spanner): add unit tests to validate errors during RW transaction and PDML

* feat(spanner): return error when Directed Read Options set for Partitioned BatchReadOnlyTransaction

* feat(spanner): modify error message

* feat(spanner): refactor unit test name

* fix(spanner): test case

* fix(spanner): test case

* feat(spanner): remove manual autogenerated code changes

* feat(spanner): remove client side validations for directed read options

* feat(spanner): skip test for emulator

* feat(spanner): go mod tidy

* feat(spanner): consider client level dro for BatchReadOnlyTransaction as transaction type is read-only

* feat(spanner): code refactor

* feat(spanner): remove unit test for partitioned update

* feat(spanner): rename test
  • Loading branch information
harshachinta authored Jan 8, 2024
1 parent bbff8ac commit a42604a
Show file tree
Hide file tree
Showing 5 changed files with 425 additions and 115 deletions.
76 changes: 40 additions & 36 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,15 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
}
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""),
DataBoostEnabled: readOptions.DataBoostEnabled,
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""),
DataBoostEnabled: readOptions.DataBoostEnabled,
DirectedReadOptions: readOptions.DirectedReadOptions,
}
// Generate partitions.
for _, p := range resp.GetPartitions() {
Expand Down Expand Up @@ -215,14 +216,15 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement

// prepare ExecuteSqlRequest
r := &sppb.ExecuteSqlRequest{
Session: sid,
Transaction: ts,
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: qOpts.Options,
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""),
DataBoostEnabled: qOpts.DataBoostEnabled,
Session: sid,
Transaction: ts,
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: qOpts.Options,
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""),
DataBoostEnabled: qOpts.DataBoostEnabled,
DirectedReadOptions: qOpts.DirectedReadOptions,
}

// generate Partitions
Expand Down Expand Up @@ -313,16 +315,17 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.StreamingRead(ctx, &sppb.ReadRequest{
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Index: p.rreq.Index,
Columns: p.rreq.Columns,
KeySet: p.rreq.KeySet,
PartitionToken: p.pt,
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.rreq.DataBoostEnabled,
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Index: p.rreq.Index,
Columns: p.rreq.Columns,
KeySet: p.rreq.KeySet,
PartitionToken: p.pt,
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.rreq.DataBoostEnabled,
DirectedReadOptions: p.rreq.DirectedReadOptions,
})
if err != nil {
return client, err
Expand All @@ -338,16 +341,17 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
} else {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Params: p.qreq.Params,
ParamTypes: p.qreq.ParamTypes,
QueryOptions: p.qreq.QueryOptions,
PartitionToken: p.pt,
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.qreq.DataBoostEnabled,
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Params: p.qreq.Params,
ParamTypes: p.qreq.ParamTypes,
QueryOptions: p.qreq.QueryOptions,
PartitionToken: p.pt,
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.qreq.DataBoostEnabled,
DirectedReadOptions: p.qreq.DirectedReadOptions,
})
if err != nil {
return client, err
Expand Down
15 changes: 15 additions & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Client struct {
bwo BatchWriteOptions
ct *commonTags
disableRouteToLeader bool
dro *sppb.DirectedReadOptions
}

// DatabaseName returns the full name of a database, e.g.,
Expand Down Expand Up @@ -186,6 +187,11 @@ type ClientConfig struct {

// BatchTimeout specifies the timeout for a batch of sessions managed sessionClient.
BatchTimeout time.Duration

// ClientConfig options used to set the DirectedReadOptions for all ReadRequests
// and ExecuteSqlRequests for the Client which indicate which replicas or regions
// should be used for non-transactional reads or queries.
DirectedReadOptions *sppb.DirectedReadOptions
}

func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD, disableRouteToLeader bool) context.Context {
Expand Down Expand Up @@ -291,6 +297,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
bwo: config.BatchWriteOptions,
ct: getCommonTags(sc),
disableRouteToLeader: config.DisableRouteToLeader,
dro: config.DirectedReadOptions,
}
return c, nil
}
Expand Down Expand Up @@ -374,6 +381,8 @@ func (c *Client) Single() *ReadOnlyTransaction {
t.sh = sh
return nil
}
t.txReadOnly.qo.DirectedReadOptions = c.dro
t.txReadOnly.ro.DirectedReadOptions = c.dro
t.ct = c.ct
return t
}
Expand All @@ -397,6 +406,8 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txReadOnly.disableRouteToLeader = true
t.txReadOnly.qo.DirectedReadOptions = c.dro
t.txReadOnly.ro.DirectedReadOptions = c.dro
t.ct = c.ct
return t
}
Expand Down Expand Up @@ -465,6 +476,8 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txReadOnly.disableRouteToLeader = true
t.txReadOnly.qo.DirectedReadOptions = c.dro
t.txReadOnly.ro.DirectedReadOptions = c.dro
t.ct = c.ct
return t, nil
}
Expand Down Expand Up @@ -496,6 +509,8 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txReadOnly.disableRouteToLeader = true
t.txReadOnly.qo.DirectedReadOptions = c.dro
t.txReadOnly.ro.DirectedReadOptions = c.dro
t.ct = c.ct
return t
}
Expand Down
Loading

0 comments on commit a42604a

Please sign in to comment.