Skip to content

Commit

Permalink
Fix issue with context being cancelled prematurely (#70)
Browse files Browse the repository at this point in the history
There was a bug where we would cancel the health-checker's context after
warming up the connection, which means that health checkers that rely on
the context would start permanently failing after the first check.

To reproduce, I updated the `FakeHealthChecker` used in tests to be
context-sensitive: it uses `context.AfterFunc` to permanently mark a
connection as unhealthy if/when the context is cancelled.

One of my local test runs then hung, revealing a potential deadlock bug.
The issue was that the balancer would acquire a lock and then call
`check.Close()`. For the `FakeHealthChecker`, closing then acquires the
health checker's lock. But over in the
`FakeHealthChecker.UpdateHealthState`, we acquire locks in the reverse
order: acquiring the health checker's lock and then calling
`tracker.UpdateHealthState` (which ends up calling into the balancer and
acquiring the balancer's lock). Having the lock acquisition order differ
exposes a potential deadlock.

The deadlock is also fixed in this branch, by making it so that we don't
ever try to acquire both locks at the same time, much less acquire them
in different orders.
  • Loading branch information
jhump authored Nov 20, 2024
1 parent bb123e9 commit 052fe86
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
19 changes: 12 additions & 7 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
b.pool.RemoveConn(c)
}
}()
var checkClosers []io.Closer
defer func() {
for _, c := range checkClosers {
_ = c.Close()
}
}()
b.mu.Lock()
defer b.mu.Unlock()
newConns := make([]conn.Conn, 0, len(b.conns)+numAdded-numRemoved)
Expand All @@ -312,7 +318,7 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
delete(b.connInfo, existing)
info.cancel()
if info.closeChecker != nil {
_ = info.closeChecker.Close()
checkClosers = append(checkClosers, info.closeChecker)
}
continue
}
Expand All @@ -331,12 +337,6 @@ func (b *balancer) initConnInfoLocked(conns []conn.Conn) {
connection := conns[i]
connCtx, connCancel := context.WithCancel(b.ctx)
healthChecker := b.healthChecker.New(connCtx, connection, b)
go func() {
defer connCancel()
if err := connection.Prewarm(connCtx); err == nil {
b.warmedUp(connection)
}
}()
cancel := connCancel
if b.roundTripperMaxLifetime != 0 {
timer := b.clock.AfterFunc(b.roundTripperMaxLifetime, func() {
Expand All @@ -348,6 +348,11 @@ func (b *balancer) initConnInfoLocked(conns []conn.Conn) {
}
}
b.connInfo[connection] = connInfo{closeChecker: healthChecker, cancel: cancel}
go func() {
if err := connection.Prewarm(connCtx); err == nil {
b.warmedUp(connection)
}
}()
}
}

Expand Down
26 changes: 24 additions & 2 deletions internal/balancertesting/balancertesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func NewFakeHealthChecker() *FakeHealthChecker {
// New implements the healthchecker.Checker interface. It will use the
// given tracker to mark the given connection with the currently configured
// initial health state (which defaults to health).
func (hc *FakeHealthChecker) New(_ context.Context, connection conn.Conn, tracker health.Tracker) io.Closer {
func (hc *FakeHealthChecker) New(ctx context.Context, connection conn.Conn, tracker health.Tracker) io.Closer {
hc.mu.Lock()
defer hc.mu.Unlock()
state := hc.initialState
Expand All @@ -357,6 +357,14 @@ func (hc *FakeHealthChecker) New(_ context.Context, connection conn.Conn, tracke
if ch := hc.initialized[connection]; ch != nil {
close(ch)
}
context.AfterFunc(ctx, func() {
// Automatically force state to unhealthy after context is cancelled.
tracker := hc.updateHealthState(connection, health.StateUnhealthy, true)
if tracker == nil {
return
}
tracker.UpdateHealthState(connection, health.StateUnhealthy)
})
}()
select {
case hc.checkersUpdated <- struct{}{}:
Expand All @@ -378,10 +386,24 @@ func (hc *FakeHealthChecker) New(_ context.Context, connection conn.Conn, tracke

// UpdateHealthState allows the state of a connection to be changed.
func (hc *FakeHealthChecker) UpdateHealthState(connection conn.Conn, state health.State) {
tracker := hc.updateHealthState(connection, state, false)
if tracker != nil {
tracker.UpdateHealthState(connection, state)
}
}

func (hc *FakeHealthChecker) updateHealthState(connection conn.Conn, state health.State, removeTracker bool) health.Tracker {
hc.mu.Lock()
defer hc.mu.Unlock()
hc.trackers[connection].UpdateHealthState(connection, state)
tracker := hc.trackers[connection]
if tracker == nil {
return nil
}
hc.conns[connection] = state
if removeTracker {
delete(hc.trackers, connection)
}
return tracker
}

// SetInitialState sets the state that new connections will be put into
Expand Down

0 comments on commit 052fe86

Please sign in to comment.