Skip to content

Commit

Permalink
feat(spanner): add support of multiplexed session support in writeAtl…
Browse files Browse the repository at this point in the history
…eastOnce mutations (#10646)

* feat(spanner): add support of multiplexed session support in mutations

* fix tests

* remove BatchWrite support

* revert unrelated changes
  • Loading branch information
rahul2393 authored Aug 7, 2024
1 parent 7797022 commit 54009ea
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 16 deletions.
48 changes: 36 additions & 12 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3555,15 +3555,21 @@ func TestClient_ApplyAtLeastOnceReuseSession(t *testing.T) {
if err != nil {
t.Fatal(err)
}
expectedIdleSesions := sp.incStep
if isMultiplexEnabled {
expectedIdleSesions = 0
}
sp.mu.Lock()
if g, w := uint64(sp.idleList.Len())+sp.createReqs, sp.incStep; g != w {
if g, w := uint64(sp.idleList.Len())+sp.createReqs, expectedIdleSesions; g != w {
sp.mu.Unlock()
t.Fatalf("idle session count mismatch:\nGot: %v\nWant: %v", g, w)
}
expectedSessions := sp.incStep
expectedSessions := expectedIdleSesions
if isMultiplexEnabled {
expectedSessions++
}
if g, w := uint64(len(server.TestSpanner.DumpSessions())), expectedSessions; g != w {
sp.mu.Unlock()
t.Fatalf("server session count mismatch:\nGot: %v\nWant: %v", g, w)
}
sp.mu.Unlock()
Expand Down Expand Up @@ -3602,14 +3608,20 @@ func TestClient_ApplyAtLeastOnceInvalidArgument(t *testing.T) {
t.Fatal(err)
}
sp.mu.Lock()
if g, w := uint64(sp.idleList.Len())+sp.createReqs, sp.incStep; g != w {
expectedIdleSesions := sp.incStep
if isMultiplexEnabled {
expectedIdleSesions = 0
}
if g, w := uint64(sp.idleList.Len())+sp.createReqs, expectedIdleSesions; g != w {
sp.mu.Unlock()
t.Fatalf("idle session count mismatch:\nGot: %v\nWant: %v", g, w)
}
var countMuxSess uint64
if isMultiplexEnabled {
countMuxSess = 1
}
if g, w := uint64(len(server.TestSpanner.DumpSessions())), sp.incStep+countMuxSess; g != w {
if g, w := uint64(len(server.TestSpanner.DumpSessions())), expectedIdleSesions+countMuxSess; g != w {
sp.mu.Unlock()
t.Fatalf("server session count mismatch:\nGot: %v\nWant: %v", g, w)
}
sp.mu.Unlock()
Expand Down Expand Up @@ -6277,18 +6289,30 @@ func TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams(t *testing.T) {
if err != nil {
t.Fatal(err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{

expectedReqs := []interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
&sppb.CommitRequest{},
}
muxCreateBuffer := 0
if isMultiplexEnabled {
muxCreateBuffer = 1
expectedReqs = []interface{}{
&sppb.CreateSessionRequest{},
&sppb.CommitRequest{},
}
}
if !requests[1+muxCreateBuffer].(*sppb.CommitRequest).Transaction.(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests(expectedReqs, requests); err != nil {
t.Fatal(err)
}
for _, req := range requests {
if request, ok := req.(*sppb.CommitRequest); ok {
if !request.Transaction.(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
if !testEqual(isMultiplexEnabled, strings.Contains(request.GetSession(), "multiplexed")) {
t.Errorf("TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams expected multiplexed session to be used, got: %v", request.GetSession())
}
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
for {
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// No usable session for doing the commit, take one from pool.
sh, err = t.sp.take(ctx)
sh, err = t.sp.takeMultiplexed(ctx)
if err != nil {
// sessionPool.Take already retries for session
// creations/retrivals.
Expand All @@ -1912,6 +1912,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag),
})
if err != nil && !isAbortedErr(err) {
// should not be the case with multiplexed sessions
if isSessionNotFoundError(err) {
// Discard the bad session.
sh.destroy()
Expand Down
23 changes: 20 additions & 3 deletions spanner/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,30 @@ func TestApply_Single(t *testing.T) {
if _, e := client.Apply(ctx, ms, ApplyAtLeastOnce()); e != nil {
t.Fatalf("applyAtLeastOnce retry on abort, got %v, want nil.", e)
}

if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
requests := drainRequestsFromServer(server.TestSpanner)
expectedReqs := []interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.CommitRequest{},
}); err != nil {
}
if isMultiplexEnabled {
expectedReqs = []interface{}{
&sppb.CreateSessionRequest{},
&sppb.CommitRequest{},
}
}
if err := compareRequests(expectedReqs, requests); err != nil {
t.Fatal(err)
}
for _, s := range requests {
switch s.(type) {
case *sppb.CommitRequest:
req, _ := s.(*sppb.CommitRequest)
// Validate the session is multiplexed
if !testEqual(isMultiplexEnabled, strings.Contains(req.Session, "multiplexed")) {
t.Errorf("TestApply_Single expected multiplexed session to be used, got: %v", req.Session)
}
}
}
}

// Transaction retries on abort.
Expand Down

0 comments on commit 54009ea

Please sign in to comment.