Skip to content

Commit

Permalink
GODRIVER-1957 Update connection pinning logic for LB mode (mongodb#633)
Browse files Browse the repository at this point in the history
  • Loading branch information
Divjot Arora authored and Mohammad Fahim Abrar committed Mar 17, 2022
1 parent e898358 commit 777d700
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 70 deletions.
13 changes: 3 additions & 10 deletions x/mongo/driver/batch_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,17 +364,10 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
}
}

// If we're in load balanced mode and the pinned connection encounters a network error, it should be unpinned. In
// this case, we call Expire on the connection to ensure it's closed and the cursor will be reaped by the server.
// If we're in load balanced mode and the pinned connection encounters a network error, we should not use it for
// future commands. Per the spec, the connection will not be unpinned until the cursor is actually closed, but
// we set the cursor ID to 0 to ensure the Close() call will not execute a killCursors command.
if driverErr, ok := bc.err.(Error); ok && driverErr.NetworkError() && bc.connection != nil {
err := bc.connection.Expire()
if err != nil && bc.err == nil {
bc.err = err
}

// Also unset the connection and set the cursor ID to 0 because the cursor is no longer valid, so we shouldn't
// send any more commands for it.
bc.connection = nil
bc.id = 0
}

Expand Down
4 changes: 0 additions & 4 deletions x/mongo/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,8 @@ type Connection interface {
// 1. Each Pin* call should increment the number of references for the connection.
// 2. Each Unpin* call should decrement the number of references for the connection.
// 3. Calls to Close() should be ignored until all resources have unpinned the connection.
// 4. Calls to Expire() should forcefully close the connection even if there are references that have not been unpinned.
// This method should be used to indicate that a PinnedConnection is no longer valid. Unpin* calls after Expire has
// been called should be ignored.
type PinnedConnection interface {
Connection
Expirable
PinToCursor() error
PinToTransaction() error
UnpinFromCursor() error
Expand Down
34 changes: 10 additions & 24 deletions x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
return err
}

if op.Client != nil {
if err := op.Client.StartCommand(); err != nil {
return err
}
}

srvr, conn, err := op.getServerAndConnection(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -419,26 +425,6 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
_ = ep.ProcessError(err, conn)
}

// If we're executing a load-balanced transaction and encounter a network error, the pinned connection should
// be unpinned. We call ExpirePinnedConnection to ensure that the connection is closed and returned to the
// pool for bookkeeping. Future AbortTransaction calls will check out a new connection, which is desired. We
// do this before any other checks to make sure we release the invalidated connection even though other
// resources may be holding references to it.
if op.Client != nil && op.Client.PinnedConnection != nil {
if driverErr, ok := err.(Error); ok && driverErr.NetworkError() {
_ = op.Client.ExpirePinnedConnection()
}
}

// If we're executing a load-balanced transaction and are committing/aborting, unpin the session's connection.
// This has to be done before entering the retryability logic because commit/abort attempts are retryable on a
// different mongos, so we want to allow for the possibility of checking out a new connection for the retry.
if op.Client != nil && (op.Client.Committing || op.Client.Aborting) && op.Client.PinnedConnection != nil {
if err := op.Client.UnpinConnection(); err != nil {
return err
}
}

finishedInfo.response = res
finishedInfo.cmdErr = err
op.publishFinishedEvent(ctx, finishedInfo)
Expand Down Expand Up @@ -527,7 +513,9 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
operationErr.Labels = tt.Labels
case Error:
if tt.HasErrorLabel(TransientTransactionError) || tt.HasErrorLabel(UnknownTransactionCommitResult) {
op.Client.ClearPinnedServer()
if err := op.Client.ClearPinnedResources(); err != nil {
return err
}
}
if e := err.(Error); retryable && op.Type == Write && e.UnsupportedStorageEngine() {
return ErrUnsupportedStorageEngine
Expand Down Expand Up @@ -1101,9 +1089,7 @@ func (op Operation) addSession(dst []byte, desc description.SelectedServer) ([]b
dst = bsoncore.AppendBooleanElement(dst, "autocommit", false)
}

client.ApplyCommand(desc.Server)

return dst, nil
return dst, client.ApplyCommand(desc.Server)
}

func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer) []byte {
Expand Down
3 changes: 2 additions & 1 deletion x/mongo/driver/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func TestOperation(t *testing.T) {
noerr(t, err)
err = sessInProgressTransaction.StartTransaction(nil)
noerr(t, err)
sessInProgressTransaction.ApplyCommand(description.Server{})
err = sessInProgressTransaction.ApplyCommand(description.Server{})
noerr(t, err)

wcAck := writeconcern.New(writeconcern.WMajority())
wcUnack := writeconcern.New(writeconcern.W(0))
Expand Down
73 changes: 43 additions & 30 deletions x/mongo/driver/session/client_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ type LoadBalancedTransactionConnection interface {
Address() address.Address
Stale() bool

// Functions copied over from driver.Expirable.
Alive() bool
Expire() error

// Functions copied over from driver.PinnedConnection that are not part of Connection or Expirable.
PinToCursor() error
PinToTransaction() error
Expand Down Expand Up @@ -266,11 +262,23 @@ func (c *Client) UpdateRecoveryToken(response bson.Raw) {
c.RecoveryToken = token.Document()
}

// ClearPinnedServer sets the PinnedServer to nil.
func (c *Client) ClearPinnedServer() {
if c != nil {
c.PinnedServer = nil
// ClearPinnedResources clears the pinned server and/or connection associated with the session.
func (c *Client) ClearPinnedResources() error {
if c == nil {
return nil
}

c.PinnedServer = nil
if c.PinnedConnection != nil {
if err := c.PinnedConnection.UnpinFromTransaction(); err != nil {
return err
}
if err := c.PinnedConnection.Close(); err != nil {
return err
}
}
c.PinnedConnection = nil
return nil
}

// UnpinConnection gracefully unpins the connection associated with the session if there is one. This is done via
Expand All @@ -289,18 +297,6 @@ func (c *Client) UnpinConnection() error {
return err
}

// ExpirePinnedConnection forcefully unpins the connection assocated with the session if there is one. This is done via
// the pinned connection's Expire function.
func (c *Client) ExpirePinnedConnection() error {
if c == nil || c.PinnedConnection == nil {
return nil
}

err := c.PinnedConnection.Expire()
c.PinnedConnection = nil
return err
}

// EndSession ends the session.
func (c *Client) EndSession() {
if c.Terminated {
Expand Down Expand Up @@ -378,13 +374,12 @@ func (c *Client) StartTransaction(opts *TransactionOptions) error {
}

if !writeconcern.AckWrite(c.CurrentWc) {
c.clearTransactionOpts()
_ = c.clearTransactionOpts()
return ErrUnackWCUnsupported
}

c.TransactionState = Starting
c.PinnedServer = nil
return nil
return c.ClearPinnedResources()
}

// CheckCommitTransaction checks to see if allowed to commit transaction and returns
Expand Down Expand Up @@ -442,15 +437,30 @@ func (c *Client) AbortTransaction() error {
return err
}
c.TransactionState = Aborted
c.clearTransactionOpts()
return c.clearTransactionOpts()
}

// StartCommand updates the session's internal state at the beginning of an operation. This must be called before
// server selection is done for the operation as the session's state can impact the result of that process.
func (c *Client) StartCommand() error {
if c == nil {
return nil
}

// If we're executing the first operation using this session after a transaction, we must ensure that the session
// is not pinned to any resources.
if !c.TransactionRunning() && !c.Committing && !c.Aborting {
return c.ClearPinnedResources()
}
return nil
}

// ApplyCommand advances the state machine upon command execution.
func (c *Client) ApplyCommand(desc description.Server) {
// ApplyCommand advances the state machine upon command execution. This must be called after server selection is
// complete.
func (c *Client) ApplyCommand(desc description.Server) error {
if c.Committing {
// Do not change state if committing after already committed
return
return nil
}
if c.TransactionState == Starting {
c.TransactionState = InProgress
Expand All @@ -459,18 +469,21 @@ func (c *Client) ApplyCommand(desc description.Server) {
c.PinnedServer = &desc
}
} else if c.TransactionState == Committed || c.TransactionState == Aborted {
c.clearTransactionOpts()
c.TransactionState = None
return c.clearTransactionOpts()
}

return nil
}

func (c *Client) clearTransactionOpts() {
func (c *Client) clearTransactionOpts() error {
c.RetryingCommit = false
c.Aborting = false
c.Committing = false
c.CurrentWc = nil
c.CurrentRp = nil
c.CurrentRc = nil
c.PinnedServer = nil
c.RecoveryToken = nil

return c.ClearPinnedResources()
}
4 changes: 3 additions & 1 deletion x/mongo/driver/session/client_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
testhelpers "go.mongodb.org/mongo-driver/internal/testutil/helpers"
"go.mongodb.org/mongo-driver/mongo/description"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
Expand Down Expand Up @@ -149,7 +150,8 @@ func TestClientSession(t *testing.T) {
t.Errorf("expected error, got %v", err)
}

sess.ApplyCommand(description.Server{Kind: description.Standalone})
err = sess.ApplyCommand(description.Server{Kind: description.Standalone})
assert.Nil(t, err, "ApplyCommand error: %v", err)
if sess.TransactionState != InProgress {
t.Errorf("incorrect session state, expected InProgress, received %v", sess.TransactionState)
}
Expand Down

0 comments on commit 777d700

Please sign in to comment.