Skip to content

Commit

Permalink
GODRIVER-2037 Don't clear the connection pool on client-side connect …
Browse files Browse the repository at this point in the history
…timeout errors. (#688)
  • Loading branch information
matthewdale authored Jun 30, 2021
1 parent 6d6365b commit 5199a0b
Show file tree
Hide file tree
Showing 8 changed files with 535 additions and 65 deletions.
59 changes: 59 additions & 0 deletions mongo/integration/primary_stepdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"sync"
"testing"

"go.mongodb.org/mongo-driver/bson"
Expand All @@ -23,7 +24,65 @@ const (
errorInterruptedAtShutdown int32 = 11600
)

// testPoolMonitor exposes an *event.PoolMonitor and collects all events logged to that
// *event.PoolMonitor. It is safe to use from multiple concurrent goroutines.
type testPoolMonitor struct {
*event.PoolMonitor

events []*event.PoolEvent
mu sync.RWMutex
}

func newTestPoolMonitor() *testPoolMonitor {
tpm := &testPoolMonitor{
events: make([]*event.PoolEvent, 0),
}
tpm.PoolMonitor = &event.PoolMonitor{
Event: func(evt *event.PoolEvent) {
tpm.mu.Lock()
defer tpm.mu.Unlock()
tpm.events = append(tpm.events, evt)
},
}
return tpm
}

// Events returns a copy of the events collected by the testPoolMonitor. Filters can optionally be
// applied to the returned events set and are applied using AND logic (i.e. all filters must return
// true to include the event in the result).
func (tpm *testPoolMonitor) Events(filters ...func(*event.PoolEvent) bool) []*event.PoolEvent {
filtered := make([]*event.PoolEvent, 0, len(tpm.events))
tpm.mu.RLock()
defer tpm.mu.RUnlock()

for _, evt := range tpm.events {
keep := true
for _, filter := range filters {
if !filter(evt) {
keep = false
break
}
}
if keep {
filtered = append(filtered, evt)
}
}

return filtered
}

// IsPoolCleared returns true if there are any events of type "event.PoolCleared" in the events
// recorded by the testPoolMonitor.
func (tpm *testPoolMonitor) IsPoolCleared() bool {
poolClearedEvents := tpm.Events(func(evt *event.PoolEvent) bool {
return evt.Type == event.PoolCleared
})
return len(poolClearedEvents) > 0
}

var poolChan = make(chan *event.PoolEvent, 100)

// TODO(GODRIVER-2068): Replace all uses of poolMonitor with individual instances of testPoolMonitor.
var poolMonitor = &event.PoolMonitor{
Event: func(event *event.PoolEvent) {
poolChan <- event
Expand Down
160 changes: 106 additions & 54 deletions mongo/integration/sdam_error_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ func TestSDAMErrorHandling(t *testing.T) {
return options.Client().
ApplyURI(mtest.ClusterURI()).
SetRetryWrites(false).
SetPoolMonitor(poolMonitor).
SetWriteConcern(mtest.MajorityWc)
}
baseMtOpts := func() *mtest.Options {
mtOpts := mtest.NewOptions().
Topologies(mtest.ReplicaSet). // Don't run on sharded clusters to avoid complexity of sharded failpoints.
MinServerVersion("4.0"). // 4.0+ is required to use failpoints on replica sets.
Topologies(mtest.ReplicaSet, mtest.Single). // Don't run on sharded clusters to avoid complexity of sharded failpoints.
MinServerVersion("4.0"). // 4.0+ is required to use failpoints on replica sets.
ClientOptions(baseClientOpts())

if mtest.ClusterTopologyKind() == mtest.Sharded {
Expand All @@ -48,13 +47,14 @@ func TestSDAMErrorHandling(t *testing.T) {
// blockConnection and appName.
mt.RunOpts("before handshake completes", baseMtOpts().Auth(true).MinServerVersion("4.4"), func(mt *mtest.T) {
mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
mt.Run("pool cleared on network timeout", func(mt *mtest.T) {
// Assert that the pool is cleared when a connection created by an application operation thread
// encounters a network timeout during handshaking. Unlike the non-timeout test below, we only test
// connections created in the foreground for timeouts because connections created by the pool
// maintenance routine can't be timed out using a context.

appName := "authNetworkTimeoutTest"
mt.Run("pool not cleared on operation-scoped network timeout", func(mt *mtest.T) {
// Assert that the pool is not cleared when a connection created by an application
// operation thread encounters an operation timeout during handshaking. Unlike the
// non-timeout test below, we only test connections created in the foreground for
// timeouts because connections created by the pool maintenance routine can't be
// timed out using a context.

appName := "authOperationTimeoutTest"
// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
// speculative auth.
mt.SetFailPoint(mtest.FailPoint{
Expand All @@ -70,24 +70,61 @@ func TestSDAMErrorHandling(t *testing.T) {
},
})

// Reset the client with the appName specified in the failpoint.
clientOpts := options.Client().
SetAppName(appName).
SetRetryWrites(false).
SetPoolMonitor(poolMonitor)
mt.ResetClient(clientOpts)
clearPoolChan()
// Reset the client with the appName specified in the failpoint and the pool monitor.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

// The saslContinue blocks for 150ms so run the InsertOne with a 100ms context to cause a network
// timeout during auth and assert that the pool was cleared.
// Use a context with a 100ms timeout so that the saslContinue delay of 150ms causes
// an operation-scoped context timeout (i.e. a timeout not caused by a client timeout
// like connectTimeoutMS or socketTimeoutMS).
timeoutCtx, cancel := context.WithTimeout(mtest.Background, 100*time.Millisecond)
defer cancel()
_, err := mt.Coll.InsertOne(timeoutCtx, bson.D{{"test", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
assert.False(mt, tpm.IsPoolCleared(), "expected pool not to be cleared but was cleared")
})

mt.Run("pool cleared on non-operation-scoped network timeout", func(mt *mtest.T) {
// Assert that the pool is cleared when a connection created by an application
// operation thread encounters a timeout caused by connectTimeoutMS during
// handshaking.

appName := "authConnectTimeoutTest"
// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
// speculative auth.
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"saslContinue"},
BlockConnection: true,
BlockTimeMS: 150,
AppName: appName,
},
})

// Reset the client with the appName specified in the failpoint and the pool monitor.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().
SetAppName(appName).
SetPoolMonitor(tpm.PoolMonitor).
// Set a 100ms socket timeout so that the saslContinue delay of 150ms causes a
// timeout during socket read (i.e. a timeout not caused by the InsertOne context).
SetSocketTimeout(100 * time.Millisecond))

// Use context.Background() so that the new connection will not time out due to an
// operation-scoped timeout.
_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
})

mt.RunOpts("pool cleared on non-timeout network error", noClientOpts, func(mt *mtest.T) {
mt.Run("background", func(mt *mtest.T) {
// Assert that the pool is cleared when a connection created by the background pool maintenance
Expand All @@ -106,16 +143,19 @@ func TestSDAMErrorHandling(t *testing.T) {
},
})

clientOpts := options.Client().
// Reset the client with the appName specified in the failpoint.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().
SetAppName(appName).
SetMinPoolSize(5).
SetPoolMonitor(poolMonitor)
mt.ResetClient(clientOpts)
clearPoolChan()
SetPoolMonitor(tpm.PoolMonitor).
// Set minPoolSize to enable the background pool maintenance goroutine.
SetMinPoolSize(5))

time.Sleep(200 * time.Millisecond)
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")

assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
})

mt.Run("foreground", func(mt *mtest.T) {
// Assert that the pool is cleared when a connection created by an application thread connection
// checkout encounters a non-timeout network error during handshaking.
Expand All @@ -133,24 +173,23 @@ func TestSDAMErrorHandling(t *testing.T) {
},
})

clientOpts := options.Client().
SetAppName(appName).
SetPoolMonitor(poolMonitor)
mt.ResetClient(clientOpts)
clearPoolChan()
// Reset the client with the appName specified in the failpoint.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")
assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
})
})
})
})
mt.RunOpts("after handshake completes", baseMtOpts(), func(mt *mtest.T) {
mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
mt.Run("pool cleared on non-timeout network error", func(mt *mtest.T) {
clearPoolChan()
appName := "afterHandshakeNetworkError"

mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Expand All @@ -159,16 +198,22 @@ func TestSDAMErrorHandling(t *testing.T) {
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
CloseConnection: true,
AppName: appName,
},
})

// Reset the client with the appName specified in the failpoint.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"test", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")
assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
})
mt.Run("pool not cleared on timeout network error", func(mt *mtest.T) {
clearPoolChan()
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.Nil(mt, err, "InsertOne error: %v", err)
Expand All @@ -181,11 +226,11 @@ func TestSDAMErrorHandling(t *testing.T) {
_, err = mt.Coll.Find(timeoutCtx, filter)
assert.NotNil(mt, err, "expected Find error, got %v", err)
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)

assert.False(mt, isPoolCleared(), "expected pool to not be cleared but was")
assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
})
mt.Run("pool not cleared on context cancellation", func(mt *mtest.T) {
clearPoolChan()
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.Nil(mt, err, "InsertOne error: %v", err)
Expand All @@ -204,8 +249,7 @@ func TestSDAMErrorHandling(t *testing.T) {
assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
assert.True(mt, errors.Is(err, context.Canceled), "expected error %v to be context.Canceled", err)

assert.False(mt, isPoolCleared(), "expected pool to not be cleared but was")
assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
})
})
mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
Expand Down Expand Up @@ -242,28 +286,32 @@ func TestSDAMErrorHandling(t *testing.T) {
}
for _, tc := range testCases {
mt.RunOpts(fmt.Sprintf("command error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
clearPoolChan()
appName := fmt.Sprintf("command_error_%s", tc.name)

// Cause the next insert to fail with an ok:0 response.
fp := mtest.FailPoint{
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
ErrorCode: tc.errorCode,
AppName: appName,
},
}
mt.SetFailPoint(fp)
})

// Reset the client with the appName specified in the failpoint.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

runServerErrorsTest(mt, tc.isShutdownError)
runServerErrorsTest(mt, tc.isShutdownError, tpm)
})
mt.RunOpts(fmt.Sprintf("write concern error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
clearPoolChan()
appName := fmt.Sprintf("write_concern_error_%s", tc.name)

// Cause the next insert to fail with a write concern error.
fp := mtest.FailPoint{
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
Expand All @@ -273,32 +321,36 @@ func TestSDAMErrorHandling(t *testing.T) {
WriteConcernError: &mtest.WriteConcernErrorData{
Code: tc.errorCode,
},
AppName: appName,
},
}
mt.SetFailPoint(fp)
})

// Reset the client with the appName specified in the failpoint.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

runServerErrorsTest(mt, tc.isShutdownError)
runServerErrorsTest(mt, tc.isShutdownError, tpm)
})
}
})
})
}

func runServerErrorsTest(mt *mtest.T, isShutdownError bool) {
func runServerErrorsTest(mt *mtest.T, isShutdownError bool, tpm *testPoolMonitor) {
mt.Helper()

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")

// The pool should always be cleared for shutdown errors, regardless of server version.
if isShutdownError {
assert.True(mt, isPoolCleared(), "expected pool to be cleared, but was not")
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared, but was not")
return
}

// For non-shutdown errors, the pool is only cleared if the error is from a pre-4.2 server.
wantCleared := mtest.CompareServerVersions(mtest.ServerVersion(), "4.2") < 0
gotCleared := isPoolCleared()
assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %v; pool was cleared: %v",
gotCleared := tpm.IsPoolCleared()
assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %t; pool was cleared: %t",
wantCleared, gotCleared)
}
Loading

0 comments on commit 5199a0b

Please sign in to comment.