Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2533 Fix data race from NumberSessionsInProgress. #1085

Merged
merged 4 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,10 @@ func (c *Client) Watch(ctx context.Context, pipeline interface{},
// NumberSessionsInProgress returns the number of sessions that have been started for this client but have not been
// closed (i.e. EndSession has not been called).
func (c *Client) NumberSessionsInProgress() int {
return c.sessionPool.CheckedOut()
// The underlying session pool uses an int64 for checkedOut to allow atomic
// access. We convert to an int here to maintain backward compatibility with
// older versions of the driver that did not atomically access checkedOut.
return int(c.sessionPool.CheckedOut())
}

// Timeout returns the timeout set for this client.
Expand Down
30 changes: 30 additions & 0 deletions mongo/integration/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -475,6 +476,35 @@ func TestSessions(t *testing.T) {
assert.True(mt, limitedSessionUse, limitedSessMsg, len(ops))

})

// Regression test for GODRIVER-2533. Note that this test assumes the race
// detector is enabled (GODRIVER-2072).
mt.Run("NumberSessionsInProgress data race", func(mt *mtest.T) {
// Use two goroutines to execute a few simultaneous runs of NumberSessionsInProgress
// and a basic collection operation (CountDocuments).
var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()

for i := 0; i < 100; i++ {
time.Sleep(100 * time.Microsecond)
_ = mt.Client.NumberSessionsInProgress()
}
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
}()
go func() {
defer wg.Done()

for i := 0; i < 100; i++ {
time.Sleep(100 * time.Microsecond)
_, err := mt.Coll.CountDocuments(context.Background(), bson.D{})
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
assert.Nil(mt, err, "CountDocument error: %v", err)
}
}()

wg.Wait()
})
}

func assertCollectionCount(mt *mtest.T, expectedCount int64) {
Expand Down
16 changes: 9 additions & 7 deletions x/mongo/driver/session/session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package session

import (
"sync"
"sync/atomic"

"go.mongodb.org/mongo-driver/mongo/description"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
Expand All @@ -29,13 +30,14 @@ type topologyDescription struct {

// Pool is a pool of server sessions that can be reused.
type Pool struct {
// number of sessions checked out of pool (accessed atomically)
checkedOut int64

descChan <-chan description.Topology
head *Node
tail *Node
latestTopology topologyDescription
mutex sync.Mutex // mutex to protect list and sessionTimeout

checkedOut int // number of sessions checked out of pool
}

func (p *Pool) createServerSession() (*Server, error) {
Expand All @@ -44,7 +46,7 @@ func (p *Pool) createServerSession() (*Server, error) {
return nil, err
}

p.checkedOut++
atomic.AddInt64(&p.checkedOut, 1)
return s, nil
}

Expand Down Expand Up @@ -100,7 +102,7 @@ func (p *Pool) GetSession() (*Server, error) {
p.head = p.head.next
}

p.checkedOut++
atomic.AddInt64(&p.checkedOut, 1)
return session, nil
}

Expand All @@ -118,7 +120,7 @@ func (p *Pool) ReturnSession(ss *Server) {
p.mutex.Lock()
defer p.mutex.Unlock()

p.checkedOut--
atomic.AddInt64(&p.checkedOut, -1)
p.updateTimeout()
// check sessions at end of queue for expired
// stop checking after hitting the first valid session
Expand Down Expand Up @@ -185,6 +187,6 @@ func (p *Pool) String() string {
}

// CheckedOut returns number of sessions checked out from pool.
func (p *Pool) CheckedOut() int {
return p.checkedOut
func (p *Pool) CheckedOut() int64 {
return atomic.LoadInt64(&p.checkedOut)
}