diff --git a/spanner/client.go b/spanner/client.go index de67e864e678..5d3d078a5b0d 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -876,6 +876,8 @@ type applyOption struct { // will not be recorded in allowed tracking change streams with DDL option // allow_txn_exclusion=true. excludeTxnFromChangeStreams bool + // commitOptions is the commit options to use for the commit operation. + commitOptions CommitOptions } // An ApplyOption is an optional argument to Apply. @@ -921,6 +923,13 @@ func ExcludeTxnFromChangeStreams() ApplyOption { } } +// ApplyCommitOptions returns an ApplyOption that sets the commit options to use for the commit operation. +func ApplyCommitOptions(co CommitOptions) ApplyOption { + return func(ao *applyOption) { + ao.commitOptions = co + } +} + // Apply applies a list of mutations atomically to the database. func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { ao := &applyOption{} @@ -939,10 +948,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) if !ao.atLeastOnce { resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { return t.BufferWrite(ms) - }, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams}) + }, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams, CommitOptions: ao.commitOptions}) return resp.CommitTs, err } - t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams} + t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams, commitOptions: ao.commitOptions} return t.applyAtLeastOnce(ctx, ms...) } diff --git a/spanner/client_test.go b/spanner/client_test.go index 6d2870cdfcfc..a25aa26c7610 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -44,6 +44,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" structpb "google.golang.org/protobuf/types/known/structpb" vkit "cloud.google.com/go/spanner/apiv1" @@ -3533,6 +3534,29 @@ func TestClient_ApplyAtLeastOnce(t *testing.T) { if err != nil { t.Fatal(err) } + requests := drainRequestsFromServer(server.TestSpanner) + for _, req := range requests { + if r, ok := req.(*sppb.CommitRequest); ok { + if r.MaxCommitDelay != nil { + t.Fatalf("unexpected MaxCommitDelay: %v", r.MaxCommitDelay) + } + } + } + + // Using Max commit delay + duration := 1 * time.Millisecond + _, err = client.Apply(context.Background(), ms, ApplyAtLeastOnce(), ApplyCommitOptions(CommitOptions{MaxCommitDelay: &duration})) + if err != nil { + t.Fatal(err) + } + requests = drainRequestsFromServer(server.TestSpanner) + for _, req := range requests { + if r, ok := req.(*sppb.CommitRequest); ok { + if r.MaxCommitDelay.GetNanos() != durationpb.New(duration).GetNanos() { + t.Fatalf("unexpected MaxCommitDelay: %v", r.MaxCommitDelay) + } + } + } } func TestClient_ApplyAtLeastOnceReuseSession(t *testing.T) { @@ -5319,8 +5343,16 @@ func TestClient_Apply_Tagging(t *testing.T) { server, client, teardown := setupMockedTestServer(t) defer teardown() - client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}) + duration := time.Millisecond + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyCommitOptions(CommitOptions{MaxCommitDelay: &duration})) checkCommitForExpectedRequestOptions(t, server.TestSpanner, &sppb.RequestOptions{}) + for _, req := range drainRequestsFromServer(server.TestSpanner) { + if commitReq, ok := req.(*sppb.CommitRequest); ok { + if commitReq.MaxCommitDelay.GetNanos() != durationpb.New(duration).GetNanos() { + t.Fatalf("Missing MaxCommitDelay in commit request") + } + } + } client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, TransactionTag("tx-tag")) checkCommitForExpectedRequestOptions(t, server.TestSpanner, &sppb.RequestOptions{TransactionTag: "tx-tag"}) @@ -5330,6 +5362,9 @@ func TestClient_Apply_Tagging(t *testing.T) { client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), TransactionTag("tx-tag")) checkCommitForExpectedRequestOptions(t, server.TestSpanner, &sppb.RequestOptions{TransactionTag: "tx-tag"}) + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), TransactionTag("tx-tag")) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, &sppb.RequestOptions{TransactionTag: "tx-tag"}) } func TestClient_PartitionQuery_RequestOptions(t *testing.T) { diff --git a/spanner/integration_test.go b/spanner/integration_test.go index 82c4175f29a2..daece9034eca 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -1499,7 +1499,11 @@ func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) { Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), } - if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { + duration, err := time.ParseDuration("100ms") + if err != nil { + t.Fatal(err) + } + if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce(), ApplyCommitOptions(CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration})); err != nil { t.Fatal(err) } diff --git a/spanner/transaction.go b/spanner/transaction.go index 20a3e4037bd9..f251adca1be2 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -1859,6 +1859,8 @@ type writeOnlyTransaction struct { // current transaction from the allowed tracking change streams with DDL option // allow_txn_exclusion=true. excludeTxnFromChangeStreams bool + // commitOptions are applied to the Commit request for the writeOnlyTransaction.. + commitOptions CommitOptions } // applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, @@ -1883,6 +1885,11 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta return ts, err } + var maxCommitDelay *durationpb.Duration + if t.commitOptions.MaxCommitDelay != nil { + maxCommitDelay = durationpb.New(*(t.commitOptions.MaxCommitDelay)) + } + // Make a retryer for Aborted and certain Internal errors. retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.Internal) // Apply the mutation and retry if the commit is aborted. @@ -1910,6 +1917,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta }, Mutations: mPb, RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag), + MaxCommitDelay: maxCommitDelay, }) if err != nil && !isAbortedErr(err) { // should not be the case with multiplexed sessions