Skip to content

Commit

Permalink
Make Priority in QueryOptions effective and add related tests. (#6263)
Browse files Browse the repository at this point in the history
Co-authored-by: rahul2393 <[email protected]>
  • Loading branch information
tomo241 and rahul2393 authored Jul 4, 2022
1 parent 71bd273 commit f72abfe
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 8 deletions.
20 changes: 12 additions & 8 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,9 @@ func createRequestOptions(prio sppb.RequestOptions_Priority, requestTag, transac
func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator {
mode := sppb.ExecuteSqlRequest_NORMAL
return t.query(ctx, statement, QueryOptions{
Mode: &mode,
Options: t.qo.Options,
Mode: &mode,
Options: t.qo.Options,
Priority: t.qo.Priority,
})
}

Expand All @@ -355,17 +356,19 @@ func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement,
func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator {
mode := sppb.ExecuteSqlRequest_PROFILE
return t.query(ctx, statement, QueryOptions{
Mode: &mode,
Options: t.qo.Options,
Mode: &mode,
Options: t.qo.Options,
Priority: t.qo.Priority,
})
}

// AnalyzeQuery returns the query plan for statement.
func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) {
mode := sppb.ExecuteSqlRequest_PLAN
iter := t.query(ctx, statement, QueryOptions{
Mode: &mode,
Options: t.qo.Options,
Mode: &mode,
Options: t.qo.Options,
Priority: t.qo.Priority,
})
defer iter.Stop()
for {
Expand Down Expand Up @@ -909,8 +912,9 @@ func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error {
func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) {
mode := sppb.ExecuteSqlRequest_NORMAL
return t.update(ctx, stmt, QueryOptions{
Mode: &mode,
Options: t.qo.Options,
Mode: &mode,
Options: t.qo.Options,
Priority: t.qo.Priority,
})
}

Expand Down
74 changes: 74 additions & 0 deletions spanner/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
. "cloud.google.com/go/spanner/internal/testutil"
"github.com/golang/protobuf/ptypes"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/genproto/googleapis/rpc/errdetails"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -621,6 +622,79 @@ func TestBatchDML_StatementBased_WithMultipleDML(t *testing.T) {
}
}

func TestPriorityInQueryOptions(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfigAndClientOptions(
t, ClientConfig{QueryOptions: QueryOptions{Priority: sppb.RequestOptions_PRIORITY_LOW}},
[]option.ClientOption{},
)
defer teardown()

tx, err := NewReadWriteStmtBasedTransaction(ctx, client)
var iter *RowIterator
iter = tx.txReadOnly.Query(ctx, NewStatement("SELECT 1"))
err = iter.Do(func(r *Row) error { return nil })
if status.Code(err) != codes.Internal {
t.Fatalf("got unexpected error %v, expected Internal \"No result found for SELECT 1\"", err)
}
iter = tx.txReadOnly.QueryWithOptions(ctx, NewStatement("SELECT 1"),
QueryOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM})
err = iter.Do(func(r *Row) error { return nil })
if status.Code(err) != codes.Internal {
t.Fatalf("got unexpected error %v, expected Internal \"No result found for SELECT 1\"", err)
}
iter = tx.txReadOnly.QueryWithStats(ctx, NewStatement("SELECT 1"))
err = iter.Do(func(r *Row) error { return nil })
if status.Code(err) != codes.Internal {
t.Fatalf("got unexpected error %v, expected Internal \"No result found for SELECT 1\"", err)
}
_, err = tx.txReadOnly.AnalyzeQuery(ctx, NewStatement("SELECT 1"))
if status.Code(err) != codes.Internal {
t.Fatalf("got unexpected error %v, expected Internal \"No result found for SELECT 1\"", err)
}
if _, err = tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil {
tx.Rollback(ctx)
t.Fatal(err)
}
if _, err = tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM}); err != nil {
tx.Rollback(ctx)
t.Fatal(err)
}

gotReqs, err := shouldHaveReceived(server.TestSpanner, []interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteSqlRequest{},
})
if err != nil {
t.Fatal(err)
}
if got, want := gotReqs[2].(*sppb.ExecuteSqlRequest).RequestOptions.Priority, sppb.RequestOptions_PRIORITY_LOW; got != want {
t.Errorf("got %d, want %d", got, want)
}
if got, want := gotReqs[3].(*sppb.ExecuteSqlRequest).RequestOptions.Priority, sppb.RequestOptions_PRIORITY_MEDIUM; got != want {
t.Errorf("got %d, want %d", got, want)
}
if got, want := gotReqs[4].(*sppb.ExecuteSqlRequest).RequestOptions.Priority, sppb.RequestOptions_PRIORITY_LOW; got != want {
t.Errorf("got %d, want %d", got, want)
}
if got, want := gotReqs[5].(*sppb.ExecuteSqlRequest).RequestOptions.Priority, sppb.RequestOptions_PRIORITY_LOW; got != want {
t.Errorf("got %d, want %d", got, want)
}
if got, want := gotReqs[6].(*sppb.ExecuteSqlRequest).RequestOptions.Priority, sppb.RequestOptions_PRIORITY_LOW; got != want {
t.Errorf("got %d, want %d", got, want)
}
if got, want := gotReqs[7].(*sppb.ExecuteSqlRequest).RequestOptions.Priority, sppb.RequestOptions_PRIORITY_MEDIUM; got != want {
t.Errorf("got %d, want %d", got, want)
}
}

// shouldHaveReceived asserts that exactly expectedRequests were present in
// the server's ReceivedRequests channel. It only looks at type, not contents.
//
Expand Down

0 comments on commit f72abfe

Please sign in to comment.