Skip to content

Commit

Permalink
move type assertion
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Aug 17, 2021
1 parent 1e840e8 commit acaeb64
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
33 changes: 17 additions & 16 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ type exitIdle struct{}
// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
updateCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
hasExitIdle bool
updateCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event

mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
Expand All @@ -65,6 +66,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
_, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdle)
return ccb
}

Expand Down Expand Up @@ -93,17 +95,12 @@ func (ccb *ccBalancerWrapper) watcher() {
case exitIdle:
if ccb.cc.GetState() == connectivity.Idle {
if ei, ok := ccb.balancer.(balancer.ExitIdle); ok {
// We already checked that the balancer implements
// ExitIdle before pushing the event to updateCh, but
// check conditionally again as defensive programming.
ccb.balancerMu.Lock()
ei.ExitIdle()
ccb.balancerMu.Unlock()
} else {
// Fallback path for LB policies that don't support
// ExitIdle.
ccb.cc.mu.Lock()
for ac := range ccb.cc.conns {
go ac.connect()
}
ccb.cc.mu.Unlock()
}
}
default:
Expand Down Expand Up @@ -138,8 +135,12 @@ func (ccb *ccBalancerWrapper) close() {
<-ccb.done.Done()
}

func (ccb *ccBalancerWrapper) exitIdle() {
func (ccb *ccBalancerWrapper) exitIdle() bool {
if !ccb.hasExitIdle {
return false
}
ccb.updateCh.Put(exitIdle{})
return true
}

func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
Expand Down Expand Up @@ -168,8 +169,8 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat

func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
ccb.balancer.ResolverError(err)
ccb.balancerMu.Unlock()
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
Expand Down
7 changes: 5 additions & 2 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,11 @@ func (cc *ClientConn) GetState() connectivity.State {
func (cc *ClientConn) Connect() {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.balancerWrapper != nil {
cc.balancerWrapper.exitIdle()
if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
return
}
for ac := range cc.conns {
go ac.connect()
}
}

Expand Down

0 comments on commit acaeb64

Please sign in to comment.